fix: harden analysis queue scheduling
This commit is contained in:
@@ -3,6 +3,7 @@ import { createChildLogger } from "../logger";
|
||||
import { retryWithBackoff } from "../retry";
|
||||
import {
|
||||
getConversationContextBefore,
|
||||
getMessageById,
|
||||
getPendingConversationKeys,
|
||||
getPendingMessagesByConversation,
|
||||
updateMessageAIAnalysis,
|
||||
@@ -15,13 +16,17 @@ const logger = createChildLogger("ai-analyzer");
|
||||
|
||||
// Debounce state per conversation key
|
||||
const conversationDebounceTimers = new Map<string, NodeJS.Timeout>();
|
||||
const conversationPendingBatches = new Map<string, Set<string>>();
|
||||
// Track conversations currently being processed
|
||||
const conversationProcessing = new Set<string>();
|
||||
// Track conversations in error cooldown (failed recently)
|
||||
const conversationErrorCooldown = new Map<string, number>();
|
||||
|
||||
let activeRequests = 0;
|
||||
let lastError: string | null = null;
|
||||
const MAX_ACTIVE_REQUESTS = 1;
|
||||
const DEBOUNCE_MS = 1500;
|
||||
const RECOVERY_INTERVAL_MS = 15000;
|
||||
const ERROR_COOLDOWN_MS = 30000;
|
||||
const MAX_CONTEXT_TOKENS = 8000;
|
||||
const MAX_BATCH_SIZE = 25;
|
||||
|
||||
@@ -68,6 +73,7 @@ async function processBatch(
|
||||
if (messages.length === 0) return;
|
||||
|
||||
activeRequests++;
|
||||
conversationProcessing.add(conversationKey);
|
||||
try {
|
||||
// Get context before the first message
|
||||
const firstMessage = messages[0];
|
||||
@@ -115,6 +121,9 @@ async function processBatch(
|
||||
(globalThis as any).broadcastMessageAnalyzed?.(row);
|
||||
}
|
||||
|
||||
// Clear error cooldown on success
|
||||
conversationErrorCooldown.delete(conversationKey);
|
||||
|
||||
logger.info(
|
||||
{ conversationKey, count: messages.length },
|
||||
"Batch analysis complete",
|
||||
@@ -142,8 +151,15 @@ async function processBatch(
|
||||
(globalThis as any).broadcastMessageAnalyzed?.(row);
|
||||
}
|
||||
}
|
||||
|
||||
// Set error cooldown for this conversation
|
||||
conversationErrorCooldown.set(
|
||||
conversationKey,
|
||||
Date.now() + ERROR_COOLDOWN_MS,
|
||||
);
|
||||
} finally {
|
||||
activeRequests--;
|
||||
conversationProcessing.delete(conversationKey);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,6 +167,25 @@ async function processBatch(
|
||||
* Debounced analysis trigger for a conversation
|
||||
*/
|
||||
function scheduleConversationAnalysis(conversationKey: string): void {
|
||||
// Skip if already processing
|
||||
if (conversationProcessing.has(conversationKey)) {
|
||||
logger.debug(
|
||||
{ conversationKey },
|
||||
"Conversation already processing, skipping schedule",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip if in error cooldown
|
||||
const cooldownUntil = conversationErrorCooldown.get(conversationKey);
|
||||
if (cooldownUntil && Date.now() < cooldownUntil) {
|
||||
logger.debug(
|
||||
{ conversationKey, cooldownMs: cooldownUntil - Date.now() },
|
||||
"Conversation in error cooldown, skipping schedule",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Clear existing timer
|
||||
const existingTimer = conversationDebounceTimers.get(conversationKey);
|
||||
if (existingTimer) {
|
||||
@@ -161,9 +196,14 @@ function scheduleConversationAnalysis(conversationKey: string): void {
|
||||
const timer = setTimeout(async () => {
|
||||
conversationDebounceTimers.delete(conversationKey);
|
||||
|
||||
// Wait for active requests to complete
|
||||
while (activeRequests >= MAX_ACTIVE_REQUESTS) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
// If activeRequests >= MAX_ACTIVE_REQUESTS, requeue instead of waiting
|
||||
if (activeRequests >= MAX_ACTIVE_REQUESTS) {
|
||||
logger.debug(
|
||||
{ conversationKey, activeRequests },
|
||||
"Max active requests reached, requeuing conversation",
|
||||
);
|
||||
scheduleConversationAnalysis(conversationKey);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get pending messages for this conversation
|
||||
@@ -175,9 +215,6 @@ function scheduleConversationAnalysis(conversationKey: string): void {
|
||||
if (messages.length > 0) {
|
||||
await processBatch(conversationKey, messages);
|
||||
}
|
||||
|
||||
// Clear pending batch
|
||||
conversationPendingBatches.delete(conversationKey);
|
||||
}, DEBOUNCE_MS);
|
||||
|
||||
conversationDebounceTimers.set(conversationKey, timer);
|
||||
@@ -186,13 +223,31 @@ function scheduleConversationAnalysis(conversationKey: string): void {
|
||||
/**
|
||||
* Queues a message for analysis (debounced by conversation)
|
||||
*/
|
||||
export function queueMessageAnalysis(messageId: string): void {
|
||||
export async function queueMessageAnalysis(messageId: string): Promise<void> {
|
||||
if (!config.AI_ANALYSIS_ENABLED) return;
|
||||
|
||||
logger.debug({ messageId }, "Queueing message for analysis");
|
||||
|
||||
// Note: We don't have the message here, so we'll rely on recovery interval
|
||||
// to pick it up from the database
|
||||
try {
|
||||
// Look up the message to get its conversation key
|
||||
const message = await getMessageById(messageId);
|
||||
if (!message) {
|
||||
logger.warn({ messageId }, "Message not found for analysis queue");
|
||||
return;
|
||||
}
|
||||
|
||||
// Schedule its conversation for analysis
|
||||
const conversationKey = getConversationKey(message);
|
||||
queueConversationAnalysis(conversationKey);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
messageId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
},
|
||||
"Failed to queue message for analysis",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -203,11 +258,6 @@ export function queueConversationAnalysis(conversationKey: string): void {
|
||||
|
||||
logger.debug({ conversationKey }, "Queueing conversation for analysis");
|
||||
|
||||
// Track pending batch
|
||||
if (!conversationPendingBatches.has(conversationKey)) {
|
||||
conversationPendingBatches.set(conversationKey, new Set());
|
||||
}
|
||||
|
||||
// Schedule debounced analysis
|
||||
scheduleConversationAnalysis(conversationKey);
|
||||
}
|
||||
@@ -240,14 +290,27 @@ export function startPendingAIAnalysisWorker(): void {
|
||||
const conversationKeys = await getPendingConversationKeys(100);
|
||||
|
||||
for (const key of conversationKeys) {
|
||||
// Only schedule if not already scheduled
|
||||
if (!conversationDebounceTimers.has(key)) {
|
||||
logger.debug(
|
||||
{ conversationKey: key },
|
||||
"Recovering pending conversation",
|
||||
);
|
||||
scheduleConversationAnalysis(key);
|
||||
// Skip if already scheduled
|
||||
if (conversationDebounceTimers.has(key)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip if currently processing
|
||||
if (conversationProcessing.has(key)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip if in error cooldown
|
||||
const cooldownUntil = conversationErrorCooldown.get(key);
|
||||
if (cooldownUntil && Date.now() < cooldownUntil) {
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
{ conversationKey: key },
|
||||
"Recovering pending conversation",
|
||||
);
|
||||
scheduleConversationAnalysis(key);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ error }, "Pending AI analysis recovery worker failed");
|
||||
|
||||
@@ -509,17 +509,16 @@ export async function getPendingMessagesByConversation(
|
||||
const db = getDatabase() as any;
|
||||
|
||||
// conversationKey is either thread_id or channel_id
|
||||
const isThreadId = conversationKey.startsWith("t");
|
||||
const condition = isThreadId
|
||||
? eq(messagesTable.thread_id, conversationKey)
|
||||
: eq(messagesTable.channel_id, conversationKey);
|
||||
|
||||
// Query both to safely handle the key
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(
|
||||
and(
|
||||
condition,
|
||||
or(
|
||||
eq(messagesTable.thread_id, conversationKey),
|
||||
eq(messagesTable.channel_id, conversationKey),
|
||||
),
|
||||
eq(messagesTable.ai_status, "pending"),
|
||||
isNull(messagesTable.deleted_at),
|
||||
),
|
||||
|
||||
Reference in New Issue
Block a user