feat: enhance message capture and processing with backlog support
This commit is contained in:
@@ -29,7 +29,7 @@ async function syncChannelMessages(
|
||||
continue;
|
||||
}
|
||||
|
||||
await captureMessage(message, "text");
|
||||
await captureMessage(message, "text", { source: "backlog" });
|
||||
synced++;
|
||||
}
|
||||
|
||||
|
||||
@@ -22,9 +22,18 @@ import type {
|
||||
|
||||
const logger = createChildLogger("message-capture");
|
||||
|
||||
type ModerationGlobal = typeof globalThis & {
|
||||
moderationBroadcaster?: ModerationBroadcaster;
|
||||
};
|
||||
|
||||
function getModerationBroadcaster(): ModerationBroadcaster | undefined {
|
||||
return (globalThis as ModerationGlobal).moderationBroadcaster;
|
||||
}
|
||||
|
||||
export async function captureMessage(
|
||||
message: Message,
|
||||
type: "text" | "edited" | "deleted",
|
||||
options: { source?: "live" | "backlog" } = {},
|
||||
): Promise<void> {
|
||||
const location = getMessageLocation(message);
|
||||
const metadata = getMessageMetadata(message);
|
||||
@@ -46,17 +55,19 @@ export async function captureMessage(
|
||||
metadata: JSON.stringify(metadata),
|
||||
};
|
||||
|
||||
await upsertMessageForCapture(messageRecord);
|
||||
queueMessageAnalysis(message.id);
|
||||
const inserted = await upsertMessageForCapture(messageRecord);
|
||||
if (!inserted) {
|
||||
return;
|
||||
}
|
||||
|
||||
const broadcaster = (globalThis as any).moderationBroadcaster as
|
||||
| ModerationBroadcaster
|
||||
| undefined;
|
||||
if (broadcaster) {
|
||||
broadcaster.messageCreated({
|
||||
...messageRecord,
|
||||
type: "text",
|
||||
});
|
||||
const isBacklog = options.source === "backlog";
|
||||
if (!isBacklog) {
|
||||
queueMessageAnalysis(message.id);
|
||||
}
|
||||
|
||||
const broadcaster = getModerationBroadcaster();
|
||||
if (broadcaster && !isBacklog) {
|
||||
broadcaster.messageCreated(messageRecord);
|
||||
}
|
||||
|
||||
if (message.attachments.size > 0) {
|
||||
@@ -132,9 +143,7 @@ export function registerMessageCapture(client: Client): void {
|
||||
);
|
||||
queueMessageAnalysis(newMessage.id);
|
||||
|
||||
const broadcaster = (globalThis as any).moderationBroadcaster as
|
||||
| ModerationBroadcaster
|
||||
| undefined;
|
||||
const broadcaster = getModerationBroadcaster();
|
||||
if (broadcaster) {
|
||||
broadcaster.messageUpdated({
|
||||
id: newMessage.id,
|
||||
@@ -164,9 +173,7 @@ export function registerMessageCapture(client: Client): void {
|
||||
const deletedAt = Date.now();
|
||||
await updateMessageAsDeleted(message.id, deletedAt);
|
||||
|
||||
const broadcaster = (globalThis as any).moderationBroadcaster as
|
||||
| ModerationBroadcaster
|
||||
| undefined;
|
||||
const broadcaster = getModerationBroadcaster();
|
||||
if (broadcaster) {
|
||||
broadcaster.messageDeleted({
|
||||
id: message.id,
|
||||
|
||||
@@ -57,26 +57,26 @@ export async function insertMessage(message: MessageRecord): Promise<void> {
|
||||
|
||||
export async function upsertMessageForCapture(
|
||||
message: MessageRecord,
|
||||
): Promise<void> {
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
|
||||
// Set ai_status to pending for new or recaptured/edited text
|
||||
const messageWithAIStatus = {
|
||||
...message,
|
||||
ai_status: "pending" as const,
|
||||
};
|
||||
|
||||
// Try insert first (fast path for new messages)
|
||||
await db
|
||||
const rows = await db
|
||||
.insert(messagesTable)
|
||||
.values(messageWithAIStatus)
|
||||
.onConflictDoNothing();
|
||||
.onConflictDoNothing()
|
||||
.returning({ id: messagesTable.id });
|
||||
|
||||
const inserted = rows.length > 0;
|
||||
logger.debug(
|
||||
{ messageId: message.id, channelId: message.channel_id },
|
||||
"Message upserted for capture",
|
||||
{ messageId: message.id, channelId: message.channel_id, inserted },
|
||||
inserted ? "Message inserted for capture" : "Message already captured",
|
||||
);
|
||||
return inserted;
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user