refactor: keep message capture on fast path
This commit is contained in:
@@ -1,8 +1,5 @@
|
|||||||
import type { Client, Message } from "discord.js-selfbot-v13";
|
import type { Client, Message } from "discord.js-selfbot-v13";
|
||||||
import { eq } from "drizzle-orm";
|
|
||||||
import { config } from "../config";
|
import { config } from "../config";
|
||||||
import { getDatabase } from "../database/drizzle";
|
|
||||||
import { messagesTable } from "../database/schema";
|
|
||||||
import { createChildLogger } from "../logger";
|
import { createChildLogger } from "../logger";
|
||||||
import { queueMessageAnalysis } from "./aiAnalyzer";
|
import { queueMessageAnalysis } from "./aiAnalyzer";
|
||||||
import {
|
import {
|
||||||
@@ -10,8 +7,18 @@ import {
|
|||||||
getMessageLocation,
|
getMessageLocation,
|
||||||
getMessageMetadata,
|
getMessageMetadata,
|
||||||
} from "./messageMetadata";
|
} from "./messageMetadata";
|
||||||
import { insertAttachment, insertMessage } from "./messageStore";
|
import {
|
||||||
import type { AttachmentRecord, MessageRecord } from "./types";
|
insertAttachment,
|
||||||
|
upsertMessageForCapture,
|
||||||
|
updateMessageAsEdited,
|
||||||
|
updateMessageAsDeleted,
|
||||||
|
getMessageById,
|
||||||
|
} from "./messageStore";
|
||||||
|
import type {
|
||||||
|
AttachmentRecord,
|
||||||
|
MessageRecord,
|
||||||
|
ModerationBroadcaster,
|
||||||
|
} from "./types";
|
||||||
|
|
||||||
const logger = createChildLogger("message-capture");
|
const logger = createChildLogger("message-capture");
|
||||||
|
|
||||||
@@ -39,12 +46,14 @@ export async function captureMessage(
|
|||||||
metadata: JSON.stringify(metadata),
|
metadata: JSON.stringify(metadata),
|
||||||
};
|
};
|
||||||
|
|
||||||
await insertMessage(messageRecord);
|
await upsertMessageForCapture(messageRecord);
|
||||||
queueMessageAnalysis(message.id);
|
queueMessageAnalysis(message.id);
|
||||||
|
|
||||||
const broadcaster = globalThis as any;
|
const broadcaster = (globalThis as any).moderationBroadcaster as
|
||||||
if (broadcaster.broadcastMessageCreated) {
|
| ModerationBroadcaster
|
||||||
broadcaster.broadcastMessageCreated({
|
| undefined;
|
||||||
|
if (broadcaster) {
|
||||||
|
broadcaster.messageCreated({
|
||||||
...messageRecord,
|
...messageRecord,
|
||||||
type: "text",
|
type: "text",
|
||||||
});
|
});
|
||||||
@@ -72,14 +81,8 @@ export async function captureMessage(
|
|||||||
|
|
||||||
await insertAttachment(attachmentRecord);
|
await insertAttachment(attachmentRecord);
|
||||||
|
|
||||||
if (broadcaster.broadcastAttachmentUploaded) {
|
if (broadcaster) {
|
||||||
broadcaster.broadcastAttachmentUploaded({
|
broadcaster.attachmentCreated(attachmentRecord);
|
||||||
id: attachment.id,
|
|
||||||
message_id: message.id,
|
|
||||||
filename: attachment.name || "unknown",
|
|
||||||
channel_id: location.channelId,
|
|
||||||
created_at: Date.now(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -118,16 +121,9 @@ export function registerMessageCapture(client: Client): void {
|
|||||||
if (newMessage.author?.bot) return;
|
if (newMessage.author?.bot) return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { updateMessageAsEdited } = await import("./messageStore");
|
const existing = await getMessageById(newMessage.id);
|
||||||
const db = getDatabase() as any;
|
|
||||||
|
|
||||||
const existing = await db
|
if (existing) {
|
||||||
.select()
|
|
||||||
.from(messagesTable)
|
|
||||||
.where(eq(messagesTable.id, newMessage.id))
|
|
||||||
.limit(1);
|
|
||||||
|
|
||||||
if (existing.length > 0) {
|
|
||||||
const editedAt = Date.now();
|
const editedAt = Date.now();
|
||||||
await updateMessageAsEdited(
|
await updateMessageAsEdited(
|
||||||
newMessage.id,
|
newMessage.id,
|
||||||
@@ -136,9 +132,11 @@ export function registerMessageCapture(client: Client): void {
|
|||||||
);
|
);
|
||||||
queueMessageAnalysis(newMessage.id);
|
queueMessageAnalysis(newMessage.id);
|
||||||
|
|
||||||
const broadcaster = globalThis as any;
|
const broadcaster = (globalThis as any).moderationBroadcaster as
|
||||||
if (broadcaster.broadcastMessageUpdated) {
|
| ModerationBroadcaster
|
||||||
broadcaster.broadcastMessageUpdated({
|
| undefined;
|
||||||
|
if (broadcaster) {
|
||||||
|
broadcaster.messageUpdated({
|
||||||
id: newMessage.id,
|
id: newMessage.id,
|
||||||
edited_content: getDisplayContent(newMessage as Message),
|
edited_content: getDisplayContent(newMessage as Message),
|
||||||
edited_at: editedAt,
|
edited_at: editedAt,
|
||||||
@@ -163,13 +161,14 @@ export function registerMessageCapture(client: Client): void {
|
|||||||
if (!message.author) return;
|
if (!message.author) return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { updateMessageAsDeleted } = await import("./messageStore");
|
|
||||||
const deletedAt = Date.now();
|
const deletedAt = Date.now();
|
||||||
await updateMessageAsDeleted(message.id, deletedAt);
|
await updateMessageAsDeleted(message.id, deletedAt);
|
||||||
|
|
||||||
const broadcaster = globalThis as any;
|
const broadcaster = (globalThis as any).moderationBroadcaster as
|
||||||
if (broadcaster.broadcastMessageDeleted) {
|
| ModerationBroadcaster
|
||||||
broadcaster.broadcastMessageDeleted({
|
| undefined;
|
||||||
|
if (broadcaster) {
|
||||||
|
broadcaster.messageDeleted({
|
||||||
id: message.id,
|
id: message.id,
|
||||||
deleted_at: deletedAt,
|
deleted_at: deletedAt,
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -55,6 +55,40 @@ export async function insertMessage(message: MessageRecord): Promise<void> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function upsertMessageForCapture(
|
||||||
|
message: MessageRecord,
|
||||||
|
): Promise<void> {
|
||||||
|
try {
|
||||||
|
const db = getDatabase() as any;
|
||||||
|
|
||||||
|
// Set ai_status to pending for new or recaptured/edited text
|
||||||
|
const messageWithAIStatus = {
|
||||||
|
...message,
|
||||||
|
ai_status: "pending" as const,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Try insert first (fast path for new messages)
|
||||||
|
await db
|
||||||
|
.insert(messagesTable)
|
||||||
|
.values(messageWithAIStatus)
|
||||||
|
.onConflictDoNothing();
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
{ messageId: message.id, channelId: message.channel_id },
|
||||||
|
"Message upserted for capture",
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
{
|
||||||
|
messageId: message.id,
|
||||||
|
error: error instanceof Error ? error.message : String(error),
|
||||||
|
},
|
||||||
|
"Failed to upsert message for capture",
|
||||||
|
);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function updateMessageAsEdited(
|
export async function updateMessageAsEdited(
|
||||||
messageId: string,
|
messageId: string,
|
||||||
editedContent: string,
|
editedContent: string,
|
||||||
|
|||||||
@@ -1,5 +1,9 @@
|
|||||||
|
import type { ModerationBroadcaster } from "./broadcaster";
|
||||||
|
|
||||||
export type AIStatus = "pending" | "clean" | "warn" | "flagged" | "error";
|
export type AIStatus = "pending" | "clean" | "warn" | "flagged" | "error";
|
||||||
|
|
||||||
|
export type { ModerationBroadcaster };
|
||||||
|
|
||||||
export interface MessageRecord {
|
export interface MessageRecord {
|
||||||
id: string;
|
id: string;
|
||||||
guild_id: string;
|
guild_id: string;
|
||||||
|
|||||||
Reference in New Issue
Block a user