feat: enhance moderation functionality with type improvements and global broadcaster integration
This commit is contained in:
@@ -9,10 +9,22 @@ import {
|
||||
getPendingMessagesByConversation,
|
||||
updateMessageAIAnalysis,
|
||||
} from "./messageStore";
|
||||
import type { AnalysisQueueStatus, MessageRecord } from "./types";
|
||||
import type {
|
||||
AnalysisQueueStatus,
|
||||
MessageRecord,
|
||||
ModerationBroadcaster,
|
||||
} from "./types";
|
||||
|
||||
const logger = createChildLogger("ai-analyzer");
|
||||
|
||||
type ModerationGlobal = typeof globalThis & {
|
||||
moderationBroadcaster?: ModerationBroadcaster;
|
||||
};
|
||||
|
||||
function getModerationBroadcaster(): ModerationBroadcaster | undefined {
|
||||
return (globalThis as ModerationGlobal).moderationBroadcaster;
|
||||
}
|
||||
|
||||
// Debounce state per conversation key
|
||||
const conversationDebounceTimers = new Map<string, NodeJS.Timeout>();
|
||||
// Track conversations currently being processed
|
||||
@@ -117,7 +129,7 @@ async function processBatch(
|
||||
|
||||
// Broadcast analyzed messages
|
||||
for (const row of analyzedRows) {
|
||||
(globalThis as any).moderationBroadcaster?.messageAnalyzed(row);
|
||||
getModerationBroadcaster()?.messageAnalyzed(row);
|
||||
}
|
||||
|
||||
// Clear error cooldown on success
|
||||
@@ -147,7 +159,7 @@ async function processBatch(
|
||||
error: lastError,
|
||||
});
|
||||
if (row) {
|
||||
(globalThis as any).moderationBroadcaster?.messageAnalyzed(row);
|
||||
getModerationBroadcaster()?.messageAnalyzed(row);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,25 @@
|
||||
import type { Client, Message } from "discord.js-selfbot-v13";
|
||||
import type { Channel, Client, Message } from "discord.js-selfbot-v13";
|
||||
import { config } from "../config";
|
||||
import { createChildLogger } from "../logger";
|
||||
import { captureMessage } from "./messageCapture";
|
||||
|
||||
const logger = createChildLogger("backlog-sync");
|
||||
|
||||
type BacklogChannel = Channel & {
|
||||
messages: {
|
||||
fetch(options: { limit: number; before?: string }): Promise<{
|
||||
size: number;
|
||||
values(): IterableIterator<Message>;
|
||||
}>;
|
||||
};
|
||||
};
|
||||
|
||||
function hasMessageBacklog(channel: Channel): channel is BacklogChannel {
|
||||
return "messages" in channel;
|
||||
}
|
||||
|
||||
async function syncChannelMessages(
|
||||
channel: any,
|
||||
channel: BacklogChannel,
|
||||
cutoffTime: number,
|
||||
): Promise<number> {
|
||||
let before: string | undefined;
|
||||
@@ -77,6 +90,10 @@ export async function syncSelectedChannelBacklog(
|
||||
logger.warn({ guildId, channelId }, "Channel not found for backlog sync");
|
||||
return 0;
|
||||
}
|
||||
if (!hasMessageBacklog(channel)) {
|
||||
logger.warn({ guildId, channelId }, "Channel cannot fetch message backlog");
|
||||
return 0;
|
||||
}
|
||||
|
||||
const cutoffTime = Date.now() - config.BACKLOG_SYNC_HOURS * 60 * 60 * 1000;
|
||||
logger.info(
|
||||
@@ -85,7 +102,7 @@ export async function syncSelectedChannelBacklog(
|
||||
);
|
||||
|
||||
try {
|
||||
const count = await syncChannelMessages(channel as any, cutoffTime);
|
||||
const count = await syncChannelMessages(channel, cutoffTime);
|
||||
logger.info(
|
||||
{ channelId, count },
|
||||
"Backlog sync completed for selected channel",
|
||||
|
||||
@@ -7,11 +7,14 @@ import type {
|
||||
ModerationWsEvent,
|
||||
} from "./types";
|
||||
|
||||
type ClientLike = Pick<WebSocket, "readyState" | "send">;
|
||||
export type BroadcasterClient = Pick<WebSocket, "readyState" | "send">;
|
||||
|
||||
const log = createChildLogger("broadcaster");
|
||||
|
||||
function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void {
|
||||
function sendJson(
|
||||
clients: Set<BroadcasterClient>,
|
||||
event: ModerationWsEvent,
|
||||
): void {
|
||||
const payload = JSON.stringify({ ...event, timestamp: Date.now() });
|
||||
for (const client of clients) {
|
||||
if (client.readyState === 1) {
|
||||
@@ -28,14 +31,14 @@ function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void {
|
||||
}
|
||||
|
||||
export function createBroadcaster() {
|
||||
const clients = new Set<ClientLike>();
|
||||
const clients = new Set<BroadcasterClient>();
|
||||
|
||||
return {
|
||||
addClient(client: ClientLike) {
|
||||
addClient(client: BroadcasterClient) {
|
||||
clients.add(client);
|
||||
log.debug({ clientCount: clients.size }, "Client added");
|
||||
},
|
||||
removeClient(client: ClientLike) {
|
||||
removeClient(client: BroadcasterClient) {
|
||||
clients.delete(client);
|
||||
log.debug({ clientCount: clients.size }, "Client removed");
|
||||
},
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { and, asc, desc, eq, isNull, or, sql } from "drizzle-orm";
|
||||
import { and, asc, desc, eq, isNull, or, type SQL, sql } from "drizzle-orm";
|
||||
import { getDatabase } from "../database/drizzle";
|
||||
import { attachmentsTable, messagesTable } from "../database/schema";
|
||||
import { createChildLogger } from "../logger";
|
||||
@@ -11,6 +11,29 @@ import type {
|
||||
|
||||
const logger = createChildLogger("message-store");
|
||||
|
||||
interface QueryBuilder<T = unknown> extends PromiseLike<T> {
|
||||
from(...args: unknown[]): QueryBuilder<T>;
|
||||
where(...args: unknown[]): QueryBuilder<T>;
|
||||
orderBy(...args: unknown[]): QueryBuilder<T>;
|
||||
limit(...args: unknown[]): QueryBuilder<T>;
|
||||
offset(...args: unknown[]): QueryBuilder<T>;
|
||||
values(...args: unknown[]): QueryBuilder<T>;
|
||||
onConflictDoNothing(...args: unknown[]): QueryBuilder<T>;
|
||||
returning(...args: unknown[]): QueryBuilder<T>;
|
||||
set(...args: unknown[]): QueryBuilder<T>;
|
||||
}
|
||||
|
||||
interface MessageDatabase {
|
||||
select<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
|
||||
selectDistinct<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
|
||||
insert<T = unknown>(...args: unknown[]): QueryBuilder<T>;
|
||||
update(...args: unknown[]): QueryBuilder<unknown>;
|
||||
}
|
||||
|
||||
function db(): MessageDatabase {
|
||||
return getDatabase() as unknown as MessageDatabase;
|
||||
}
|
||||
|
||||
// Cursor helpers for pagination
|
||||
interface CursorData {
|
||||
created_at: number;
|
||||
@@ -36,8 +59,8 @@ export function decodeCursor(cursor?: string): CursorData | null {
|
||||
|
||||
export async function insertMessage(message: MessageRecord): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db.insert(messagesTable).values(message).onConflictDoNothing();
|
||||
const database = db();
|
||||
await database.insert(messagesTable).values(message).onConflictDoNothing();
|
||||
|
||||
logger.debug(
|
||||
{ messageId: message.id, channelId: message.channel_id },
|
||||
@@ -59,14 +82,14 @@ export async function upsertMessageForCapture(
|
||||
message: MessageRecord,
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const database = db();
|
||||
const messageWithAIStatus = {
|
||||
...message,
|
||||
ai_status: "pending" as const,
|
||||
};
|
||||
|
||||
const rows = await db
|
||||
.insert(messagesTable)
|
||||
const rows = await database
|
||||
.insert<Array<{ id: string }>>(messagesTable)
|
||||
.values(messageWithAIStatus)
|
||||
.onConflictDoNothing()
|
||||
.returning({ id: messagesTable.id });
|
||||
@@ -95,8 +118,8 @@ export async function updateMessageAsEdited(
|
||||
editedAt: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(messagesTable)
|
||||
.set({
|
||||
edited_content: editedContent,
|
||||
@@ -130,8 +153,8 @@ export async function updateMessageAsDeleted(
|
||||
deletedAt: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(messagesTable)
|
||||
.set({
|
||||
deleted_at: deletedAt,
|
||||
@@ -158,8 +181,8 @@ export async function getMessagesByChannel(
|
||||
offset: number = 0,
|
||||
): Promise<MessageRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const rows = await db
|
||||
const database = db();
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(
|
||||
@@ -189,8 +212,11 @@ export async function insertAttachment(
|
||||
attachment: AttachmentRecord,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db.insert(attachmentsTable).values(attachment).onConflictDoNothing();
|
||||
const database = db();
|
||||
await database
|
||||
.insert(attachmentsTable)
|
||||
.values(attachment)
|
||||
.onConflictDoNothing();
|
||||
|
||||
logger.debug(
|
||||
{ attachmentId: attachment.id, messageId: attachment.message_id },
|
||||
@@ -214,8 +240,8 @@ export async function getAttachmentsByChannel(
|
||||
offset: number = 0,
|
||||
): Promise<AttachmentRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const rows = await db
|
||||
const database = db();
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(attachmentsTable)
|
||||
.where(
|
||||
@@ -247,8 +273,8 @@ export async function updateAttachmentAsUploaded(
|
||||
uploadedAt: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(attachmentsTable)
|
||||
.set({
|
||||
uploaded_url: uploadedUrl,
|
||||
@@ -278,8 +304,8 @@ export async function updateAttachmentAsFailedUpload(
|
||||
error: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(attachmentsTable)
|
||||
.set({
|
||||
upload_status: "failed",
|
||||
@@ -315,8 +341,8 @@ export async function updateMessageAIAnalysis(
|
||||
result: AIAnalysisUpdate,
|
||||
): Promise<MessageRecord | null> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(messagesTable)
|
||||
.set({
|
||||
ai_status: result.status,
|
||||
@@ -329,7 +355,7 @@ export async function updateMessageAIAnalysis(
|
||||
})
|
||||
.where(eq(messagesTable.id, messageId));
|
||||
|
||||
const rows = await db
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(eq(messagesTable.id, messageId));
|
||||
@@ -351,8 +377,8 @@ export async function getPendingAIAnalysisMessages(
|
||||
limit: number = 25,
|
||||
): Promise<MessageRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const rows = await db
|
||||
const database = db();
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(
|
||||
@@ -378,8 +404,8 @@ export async function getMessageById(
|
||||
messageId: string,
|
||||
): Promise<MessageRecord | null> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const rows = await db
|
||||
const database = db();
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(eq(messagesTable.id, messageId));
|
||||
@@ -401,8 +427,8 @@ export async function listMessages(
|
||||
query: MessageQuery,
|
||||
): Promise<PageResult<MessageRecord>> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const conditions: any[] = [];
|
||||
const database = db();
|
||||
const conditions: SQL[] = [];
|
||||
|
||||
// Apply filters
|
||||
if (query.guildId) {
|
||||
@@ -411,10 +437,7 @@ export async function listMessages(
|
||||
|
||||
if (query.channelId) {
|
||||
conditions.push(
|
||||
or(
|
||||
eq(messagesTable.channel_id, query.channelId),
|
||||
eq(messagesTable.thread_id, query.channelId),
|
||||
),
|
||||
sql`(${messagesTable.channel_id} = ${query.channelId} or ${messagesTable.thread_id} = ${query.channelId})`,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -427,11 +450,7 @@ export async function listMessages(
|
||||
}
|
||||
|
||||
if (query.status && query.status.length > 0) {
|
||||
conditions.push(
|
||||
or(
|
||||
...query.status.map((status) => eq(messagesTable.ai_status, status)),
|
||||
),
|
||||
);
|
||||
conditions.push(sql`${messagesTable.ai_status} in ${query.status}`);
|
||||
}
|
||||
|
||||
// Text search
|
||||
@@ -445,20 +464,14 @@ export async function listMessages(
|
||||
const cursorData = decodeCursor(query.cursor);
|
||||
if (cursorData) {
|
||||
conditions.push(
|
||||
or(
|
||||
sql`${messagesTable.created_at} < ${cursorData.created_at}`,
|
||||
and(
|
||||
eq(messagesTable.created_at, cursorData.created_at),
|
||||
sql`${messagesTable.id} < ${cursorData.id}`,
|
||||
),
|
||||
),
|
||||
sql`(${messagesTable.created_at} < ${cursorData.created_at} or (${messagesTable.created_at} = ${cursorData.created_at} and ${messagesTable.id} < ${cursorData.id}))`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch limit + 1 to determine if there's a next page
|
||||
const fetchLimit = query.limit + 1;
|
||||
const rows = await db
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(conditions.length > 0 ? and(...conditions) : undefined)
|
||||
@@ -506,7 +519,7 @@ export async function getConversationContextBefore(input: {
|
||||
limit: number;
|
||||
}): Promise<MessageRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const database = db();
|
||||
const { channelId, threadId, beforeCreatedAt, limit } = input;
|
||||
|
||||
// Query same thread if threadId exists, otherwise channelId
|
||||
@@ -514,7 +527,7 @@ export async function getConversationContextBefore(input: {
|
||||
? eq(messagesTable.thread_id, threadId)
|
||||
: eq(messagesTable.channel_id, channelId);
|
||||
|
||||
const rows = await db
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(
|
||||
@@ -547,11 +560,11 @@ export async function getPendingMessagesByConversation(
|
||||
limit: number = 25,
|
||||
): Promise<MessageRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
// conversationKey is either thread_id or channel_id
|
||||
// Query both to safely handle the key
|
||||
const rows = await db
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(
|
||||
@@ -584,11 +597,11 @@ export async function getPendingConversationKeys(
|
||||
limit: number = 100,
|
||||
): Promise<string[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
// Get distinct conversation keys (thread_id or channel_id) for pending messages
|
||||
const rows = await db
|
||||
.selectDistinct({
|
||||
const rows = await database
|
||||
.selectDistinct<Array<{ thread_id: string | null; channel_id: string }>>({
|
||||
thread_id: messagesTable.thread_id,
|
||||
channel_id: messagesTable.channel_id,
|
||||
})
|
||||
@@ -602,7 +615,7 @@ export async function getPendingConversationKeys(
|
||||
.limit(limit);
|
||||
|
||||
const keys: string[] = [];
|
||||
for (const row of rows as any[]) {
|
||||
for (const row of rows) {
|
||||
const key = row.thread_id || row.channel_id;
|
||||
if (key && !keys.includes(key)) {
|
||||
keys.push(key);
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import type { ModerationBroadcaster } from "./broadcaster";
|
||||
import type { BroadcasterClient, ModerationBroadcaster } from "./broadcaster";
|
||||
|
||||
export type AIStatus = "pending" | "clean" | "warn" | "flagged" | "error";
|
||||
|
||||
export type { ModerationBroadcaster };
|
||||
export type { BroadcasterClient, ModerationBroadcaster };
|
||||
|
||||
export interface MessageRecord {
|
||||
id: string;
|
||||
|
||||
Reference in New Issue
Block a user