perf: maximize AI batches by token budget
Batch AI moderation by estimated token budget instead of fixed message count. Send as many messages as fit within an 80k token request budget while keeping one concurrent API request. Include message metadata and chronological conversation context so the model can judge provocation and replies from surrounding discussion. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -10,6 +10,8 @@ const queuedMessageIds = new Set<string>();
|
||||
let isProcessing = false;
|
||||
let activeRequests = 0;
|
||||
const MAX_CONCURRENT_REQUESTS = 1;
|
||||
const MAX_AI_REQUEST_TOKENS = 80_000;
|
||||
const AI_PROMPT_TOKEN_RESERVE = 6_000;
|
||||
|
||||
interface ChatCompletionResponse {
|
||||
choices?: Array<{
|
||||
@@ -30,6 +32,20 @@ function getAnalysisText(message: MessageRecord): string {
|
||||
return (message.edited_content || message.content || "").trim();
|
||||
}
|
||||
|
||||
function estimateTokens(text: string): number {
|
||||
return Math.ceil(text.length / 4);
|
||||
}
|
||||
|
||||
function formatMessageForAnalysis(message: MessageRecord, index: number): string {
|
||||
const text = getAnalysisText(message);
|
||||
const time = new Date(message.created_at).toISOString();
|
||||
return `${index + 1}. id=${message.id} time=${time} user=${message.username}: ${text}`;
|
||||
}
|
||||
|
||||
function estimateMessageTokens(message: MessageRecord): number {
|
||||
return estimateTokens(formatMessageForAnalysis(message, 0)) + 16;
|
||||
}
|
||||
|
||||
async function fetchJson(url: string, init: RequestInit): Promise<unknown> {
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => controller.abort(), config.AI_ANALYSIS_TIMEOUT_MS);
|
||||
@@ -86,7 +102,7 @@ function parseLLMAnalysis(content: string): LLMAnalysis {
|
||||
};
|
||||
}
|
||||
|
||||
async function runLLMAnalysis(texts: string[]): Promise<{ results: LLMAnalysis[]; raw: unknown }> {
|
||||
async function runLLMAnalysis(messages: MessageRecord[]): Promise<{ results: LLMAnalysis[]; raw: unknown }> {
|
||||
const response = await retryWithBackoff(
|
||||
() => fetchJson(`${config.AI_LLM_BASE_URL}/chat/completions`, {
|
||||
method: "POST",
|
||||
@@ -139,6 +155,8 @@ ATURAN KOMUNITAS LENGKAP:
|
||||
KONTEKS KOMUNITAS:
|
||||
- Ini grup bercanda/santai, jadi slang, candaan ringan, kata kasar ringan tanpa target, pesan pendek seperti "." atau "P", dan pertanyaan tidak jelas tetap CLEAN
|
||||
- Jangan beri WARN hanya karena pesan singkat, informal, ambigu, low-quality, atau kurang konteks
|
||||
- Pahami alur pembahasan antar pesan: pesan yang sendiri terlihat normal bisa WARN/FLAGGED jika dalam konteks percakapan sedang memancing konflik, menormalisasi pelanggaran, atau melanjutkan provokasi
|
||||
- Jangan menghukum orang yang sedang menasehati, menjelaskan bahaya, mengutip, atau menolak tindakan buruk; nilai maksud dan konteksnya
|
||||
- WARN hanya jika ada orang/kelompok yang diserang, dihina, diprovokasi, atau konflik mulai dipancing
|
||||
|
||||
PENENTUAN STATUS:
|
||||
@@ -150,7 +168,7 @@ Satu JSON object per pesan dalam array.`,
|
||||
},
|
||||
{
|
||||
role: "user",
|
||||
content: `Analisis ${texts.length} pesan berikut:\n${texts.map((t, i) => `${i + 1}. ${t}`).join("\n")}`,
|
||||
content: `Analisis ${messages.length} pesan berikut sebagai satu alur percakapan. Tetap kembalikan satu hasil per pesan dengan urutan yang sama:\n${messages.map(formatMessageForAnalysis).join("\n")}`,
|
||||
},
|
||||
],
|
||||
temperature: 0.2,
|
||||
@@ -188,7 +206,7 @@ Satu JSON object per pesan dalam array.`,
|
||||
|
||||
// If batch parsing failed, parse as individual responses
|
||||
if (results.length === 0) {
|
||||
results = texts.map(() => parseLLMAnalysis(content));
|
||||
results = messages.map(() => parseLLMAnalysis(content));
|
||||
}
|
||||
|
||||
return { results, raw: response };
|
||||
@@ -197,15 +215,15 @@ Satu JSON object per pesan dalam array.`,
|
||||
async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[]): Promise<void> {
|
||||
if (messages.length === 0) return;
|
||||
|
||||
const texts = messages.map(getAnalysisText).filter((t) => t.length > 0);
|
||||
if (texts.length === 0) return;
|
||||
const analyzableMessages = messages.filter((message) => getAnalysisText(message).length > 0);
|
||||
if (analyzableMessages.length === 0) return;
|
||||
|
||||
activeRequests++;
|
||||
try {
|
||||
const { results, raw } = await runLLMAnalysis(texts);
|
||||
const { results, raw } = await runLLMAnalysis(analyzableMessages);
|
||||
|
||||
for (let i = 0; i < messages.length; i++) {
|
||||
const message = messages[i];
|
||||
for (let i = 0; i < analyzableMessages.length; i++) {
|
||||
const message = analyzableMessages[i];
|
||||
const result = results[i] || parseLLMAnalysis("");
|
||||
|
||||
const row = updateMessageAIAnalysis(db, message.id, {
|
||||
@@ -221,7 +239,7 @@ async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[
|
||||
}
|
||||
} catch (error) {
|
||||
const errorMsg = error instanceof Error ? error.message : String(error);
|
||||
for (const message of messages) {
|
||||
for (const message of analyzableMessages) {
|
||||
const row = updateMessageAIAnalysis(db, message.id, {
|
||||
status: "error",
|
||||
flags: null,
|
||||
@@ -243,24 +261,32 @@ async function drainQueue(db: SqliteDatabase): Promise<void> {
|
||||
if (isProcessing) return;
|
||||
isProcessing = true;
|
||||
try {
|
||||
const BATCH_SIZE = 5;
|
||||
const batchTokenLimit = MAX_AI_REQUEST_TOKENS - AI_PROMPT_TOKEN_RESERVE;
|
||||
|
||||
while (queuedMessageIds.size > 0) {
|
||||
// Wait if at max concurrent requests
|
||||
while (activeRequests >= MAX_CONCURRENT_REQUESTS) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
}
|
||||
|
||||
// Collect batch of messages
|
||||
const batch: MessageRecord[] = [];
|
||||
for (const messageId of queuedMessageIds) {
|
||||
if (batch.length >= BATCH_SIZE) break;
|
||||
queuedMessageIds.delete(messageId);
|
||||
let tokenEstimate = 0;
|
||||
for (const messageId of Array.from(queuedMessageIds)) {
|
||||
const message = getMessageById(db, messageId);
|
||||
if (message) batch.push(message);
|
||||
queuedMessageIds.delete(messageId);
|
||||
if (!message) continue;
|
||||
|
||||
const messageTokens = estimateMessageTokens(message);
|
||||
if (batch.length > 0 && tokenEstimate + messageTokens > batchTokenLimit) {
|
||||
queuedMessageIds.add(messageId);
|
||||
break;
|
||||
}
|
||||
|
||||
batch.push(message);
|
||||
tokenEstimate += messageTokens;
|
||||
}
|
||||
|
||||
if (batch.length > 0) {
|
||||
logger.info({ count: batch.length, tokenEstimate }, "Processing AI analysis batch");
|
||||
await analyzeAndStoreBatch(db, batch);
|
||||
}
|
||||
}
|
||||
@@ -287,7 +313,7 @@ export function startPendingAIAnalysisWorker(db: SqliteDatabase): void {
|
||||
logger.info("AI analysis worker started");
|
||||
setInterval(() => {
|
||||
if (isProcessing) return;
|
||||
const pendingMessages = getPendingAIAnalysisMessages(db, 3);
|
||||
const pendingMessages = getPendingAIAnalysisMessages(db, 500);
|
||||
if (pendingMessages.length === 0) return;
|
||||
logger.info({ count: pendingMessages.length }, "Queueing pending AI analysis messages");
|
||||
for (const message of pendingMessages) {
|
||||
|
||||
Reference in New Issue
Block a user