feat: enhance media handling and audio processing logic
This commit is contained in:
@@ -110,11 +110,6 @@ async function processBatch(
|
||||
}
|
||||
|
||||
conversationErrorCooldown.delete(conversationKey);
|
||||
|
||||
logger.info(
|
||||
{ conversationKey, count: messages.length },
|
||||
"Batch analysis complete",
|
||||
);
|
||||
} catch (error) {
|
||||
lastError = error instanceof Error ? error.message : String(error);
|
||||
conversationErrorCooldown.set(
|
||||
@@ -173,20 +168,12 @@ async function runAnalysisInWorker(
|
||||
function scheduleConversationAnalysis(conversationKey: string): void {
|
||||
// Skip if already processing
|
||||
if (conversationProcessing.has(conversationKey)) {
|
||||
logger.debug(
|
||||
{ conversationKey },
|
||||
"Conversation already processing, skipping schedule",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip if in error cooldown
|
||||
const cooldownUntil = conversationErrorCooldown.get(conversationKey);
|
||||
if (cooldownUntil && Date.now() < cooldownUntil) {
|
||||
logger.debug(
|
||||
{ conversationKey, cooldownMs: cooldownUntil - Date.now() },
|
||||
"Conversation in error cooldown, skipping schedule",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -202,10 +189,6 @@ function scheduleConversationAnalysis(conversationKey: string): void {
|
||||
|
||||
// If activeRequests >= MAX_ACTIVE_REQUESTS, requeue instead of waiting
|
||||
if (activeRequests >= MAX_ACTIVE_REQUESTS) {
|
||||
logger.debug(
|
||||
{ conversationKey, activeRequests },
|
||||
"Max active requests reached, requeuing conversation",
|
||||
);
|
||||
scheduleConversationAnalysis(conversationKey);
|
||||
return;
|
||||
}
|
||||
@@ -230,7 +213,6 @@ function scheduleConversationAnalysis(conversationKey: string): void {
|
||||
export async function queueMessageAnalysis(messageId: string): Promise<void> {
|
||||
if (!config.AI_ANALYSIS_ENABLED) return;
|
||||
|
||||
logger.debug({ messageId }, "Queueing message for analysis");
|
||||
|
||||
try {
|
||||
// Look up the message to get its conversation key
|
||||
@@ -260,7 +242,6 @@ export async function queueMessageAnalysis(messageId: string): Promise<void> {
|
||||
export function queueConversationAnalysis(conversationKey: string): void {
|
||||
if (!config.AI_ANALYSIS_ENABLED) return;
|
||||
|
||||
logger.debug({ conversationKey }, "Queueing conversation for analysis");
|
||||
|
||||
// Schedule debounced analysis
|
||||
scheduleConversationAnalysis(conversationKey);
|
||||
@@ -281,12 +262,7 @@ export function getAnalysisQueueStatus(): AnalysisQueueStatus {
|
||||
* Starts the pending AI analysis recovery worker
|
||||
*/
|
||||
export function startPendingAIAnalysisWorker(): void {
|
||||
if (!config.AI_ANALYSIS_ENABLED) {
|
||||
logger.info("AI analysis disabled");
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("AI analysis worker started");
|
||||
if (!config.AI_ANALYSIS_ENABLED) return;
|
||||
|
||||
setInterval(async () => {
|
||||
try {
|
||||
@@ -310,10 +286,6 @@ export function startPendingAIAnalysisWorker(): void {
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
{ conversationKey: key },
|
||||
"Recovering pending conversation",
|
||||
);
|
||||
scheduleConversationAnalysis(key);
|
||||
}
|
||||
} catch (error) {
|
||||
|
||||
@@ -82,12 +82,7 @@ export async function uploadAttachmentToPicser(
|
||||
},
|
||||
);
|
||||
|
||||
const parsed = parseUploadResponse(response);
|
||||
logger.info(
|
||||
{ filename, url: parsed.url },
|
||||
"Attachment uploaded successfully",
|
||||
);
|
||||
return parsed;
|
||||
return parseUploadResponse(response);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
@@ -127,8 +122,6 @@ export async function processAttachmentUpload(
|
||||
filename: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
logger.info({ attachmentId, filename }, "Starting attachment upload");
|
||||
|
||||
const buffer = await downloadDiscordAttachment(discordUrl);
|
||||
|
||||
const sizeMb = buffer.length / (1024 * 1024);
|
||||
@@ -141,10 +134,6 @@ export async function processAttachmentUpload(
|
||||
const result = await uploadAttachmentToPicser(buffer, filename);
|
||||
|
||||
await updateAttachmentAsUploaded(attachmentId, result.url, Date.now());
|
||||
logger.info(
|
||||
{ attachmentId, uploadedUrl: result.url },
|
||||
"Attachment upload completed",
|
||||
);
|
||||
} catch (error) {
|
||||
const errorMsg = error instanceof Error ? error.message : String(error);
|
||||
await updateAttachmentAsFailedUpload(attachmentId, errorMsg);
|
||||
|
||||
@@ -73,11 +73,6 @@ export async function syncBacklogMessages(client: Client): Promise<void> {
|
||||
await syncSelectedChannelBacklog(client, guild.id, config.TEXT_CHANNEL_ID);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
{ guildId: guild.id },
|
||||
"Backlog sync ready (will sync on-demand per selected channel)",
|
||||
);
|
||||
}
|
||||
|
||||
export async function syncSelectedChannelBacklog(
|
||||
@@ -102,17 +97,8 @@ export async function syncSelectedChannelBacklog(
|
||||
}
|
||||
|
||||
const cutoffTime = Date.now() - config.BACKLOG_SYNC_HOURS * 60 * 60 * 1000;
|
||||
logger.info(
|
||||
{ guildId, channelId, hours: config.BACKLOG_SYNC_HOURS },
|
||||
"Starting backlog sync for selected channel",
|
||||
);
|
||||
|
||||
try {
|
||||
const count = await syncChannelMessages(channel, cutoffTime);
|
||||
logger.info(
|
||||
{ channelId, count },
|
||||
"Backlog sync completed for selected channel",
|
||||
);
|
||||
return count;
|
||||
} catch (error) {
|
||||
logger.warn(
|
||||
|
||||
@@ -123,15 +123,6 @@ export async function captureMessage(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(
|
||||
{
|
||||
messageId: message.id,
|
||||
channelId: message.channelId,
|
||||
attachmentCount: message.attachments.size,
|
||||
},
|
||||
"Message captured",
|
||||
);
|
||||
}
|
||||
|
||||
export function registerMessageCapture(client: Client): void {
|
||||
@@ -206,8 +197,6 @@ export function registerMessageCapture(client: Client): void {
|
||||
deleted_at: deletedAt,
|
||||
});
|
||||
}
|
||||
|
||||
logger.info({ messageId: message.id }, "Message deletion captured");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
@@ -218,6 +207,4 @@ export function registerMessageCapture(client: Client): void {
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
logger.info("Message capture handlers registered");
|
||||
}
|
||||
|
||||
@@ -61,11 +61,6 @@ export async function insertMessage(message: MessageRecord): Promise<void> {
|
||||
try {
|
||||
const database = db();
|
||||
await database.insert(messagesTable).values(message).onConflictDoNothing();
|
||||
|
||||
logger.debug(
|
||||
{ messageId: message.id, channelId: message.channel_id },
|
||||
"Message inserted",
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
@@ -94,12 +89,7 @@ export async function upsertMessageForCapture(
|
||||
.onConflictDoNothing()
|
||||
.returning({ id: messagesTable.id });
|
||||
|
||||
const inserted = rows.length > 0;
|
||||
logger.debug(
|
||||
{ messageId: message.id, channelId: message.channel_id, inserted },
|
||||
inserted ? "Message inserted for capture" : "Message already captured",
|
||||
);
|
||||
return inserted;
|
||||
return rows.length > 0;
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
@@ -134,8 +124,6 @@ export async function updateMessageAsEdited(
|
||||
ai_error: null,
|
||||
})
|
||||
.where(eq(messagesTable.id, messageId));
|
||||
|
||||
logger.debug({ messageId }, "Message marked as edited");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
@@ -161,8 +149,6 @@ export async function updateMessageAsDeleted(
|
||||
type: "deleted",
|
||||
})
|
||||
.where(eq(messagesTable.id, messageId));
|
||||
|
||||
logger.debug({ messageId }, "Message marked as deleted");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
@@ -217,11 +203,6 @@ export async function insertAttachment(
|
||||
.insert(attachmentsTable)
|
||||
.values(attachment)
|
||||
.onConflictDoNothing();
|
||||
|
||||
logger.debug(
|
||||
{ attachmentId: attachment.id, messageId: attachment.message_id },
|
||||
"Attachment inserted",
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
@@ -282,11 +263,6 @@ export async function updateAttachmentAsUploaded(
|
||||
uploaded_at: uploadedAt,
|
||||
})
|
||||
.where(eq(attachmentsTable.id, attachmentId));
|
||||
|
||||
logger.debug(
|
||||
{ attachmentId, uploadedUrl },
|
||||
"Attachment marked as uploaded",
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
@@ -312,8 +288,6 @@ export async function updateAttachmentAsFailedUpload(
|
||||
upload_error: error,
|
||||
})
|
||||
.where(eq(attachmentsTable.id, attachmentId));
|
||||
|
||||
logger.debug({ attachmentId, error }, "Attachment marked as failed upload");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user