feat: add cursor-based message queries
This commit is contained in:
@@ -1,11 +1,43 @@
|
|||||||
import { and, asc, desc, eq, isNull, or } from "drizzle-orm";
|
import { and, asc, desc, eq, isNull, or, sql } from "drizzle-orm";
|
||||||
import { getDatabase } from "../database/drizzle";
|
import { getDatabase } from "../database/drizzle";
|
||||||
import { attachmentsTable, messagesTable } from "../database/schema";
|
import { attachmentsTable, messagesTable } from "../database/schema";
|
||||||
import { createChildLogger } from "../logger";
|
import { createChildLogger } from "../logger";
|
||||||
import type { AttachmentRecord, MessageRecord } from "./types";
|
import type {
|
||||||
|
AIStatus,
|
||||||
|
AttachmentRecord,
|
||||||
|
MessageQuery,
|
||||||
|
MessageRecord,
|
||||||
|
PageResult,
|
||||||
|
} from "./types";
|
||||||
|
|
||||||
const logger = createChildLogger("message-store");
|
const logger = createChildLogger("message-store");
|
||||||
|
|
||||||
|
// Cursor helpers for pagination
|
||||||
|
interface CursorData {
|
||||||
|
created_at: number;
|
||||||
|
id: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function encodeCursor(data: CursorData): string {
|
||||||
|
return Buffer.from(JSON.stringify(data)).toString("base64");
|
||||||
|
}
|
||||||
|
|
||||||
|
export function decodeCursor(cursor?: string): CursorData | null {
|
||||||
|
if (!cursor) return null;
|
||||||
|
try {
|
||||||
|
const data = JSON.parse(Buffer.from(cursor, "base64").toString("utf-8"));
|
||||||
|
if (
|
||||||
|
typeof data.created_at === "number" &&
|
||||||
|
typeof data.id === "string"
|
||||||
|
) {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function insertMessage(message: MessageRecord): Promise<void> {
|
export async function insertMessage(message: MessageRecord): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const db = getDatabase() as any;
|
||||||
@@ -327,3 +359,103 @@ export async function getMessageById(
|
|||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function listMessages(
|
||||||
|
query: MessageQuery,
|
||||||
|
): Promise<PageResult<MessageRecord>> {
|
||||||
|
try {
|
||||||
|
const db = getDatabase() as any;
|
||||||
|
const conditions: any[] = [];
|
||||||
|
|
||||||
|
// Apply filters
|
||||||
|
if (query.guildId) {
|
||||||
|
conditions.push(eq(messagesTable.guild_id, query.guildId));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (query.channelId) {
|
||||||
|
conditions.push(
|
||||||
|
or(
|
||||||
|
eq(messagesTable.channel_id, query.channelId),
|
||||||
|
eq(messagesTable.thread_id, query.channelId),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (query.threadId) {
|
||||||
|
conditions.push(eq(messagesTable.thread_id, query.threadId));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (query.userId) {
|
||||||
|
conditions.push(eq(messagesTable.user_id, query.userId));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (query.status && query.status.length > 0) {
|
||||||
|
conditions.push(
|
||||||
|
or(...query.status.map((status) => eq(messagesTable.ai_status, status))),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Text search
|
||||||
|
if (query.q) {
|
||||||
|
const pattern = `%${query.q.toLowerCase()}%`;
|
||||||
|
conditions.push(sql`lower(${messagesTable.content}) like ${pattern}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cursor-based pagination (newest first)
|
||||||
|
if (query.cursor) {
|
||||||
|
const cursorData = decodeCursor(query.cursor);
|
||||||
|
if (cursorData) {
|
||||||
|
conditions.push(
|
||||||
|
or(
|
||||||
|
sql`${messagesTable.created_at} < ${cursorData.created_at}`,
|
||||||
|
and(
|
||||||
|
eq(messagesTable.created_at, cursorData.created_at),
|
||||||
|
sql`${messagesTable.id} < ${cursorData.id}`,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch limit + 1 to determine if there's a next page
|
||||||
|
const fetchLimit = query.limit + 1;
|
||||||
|
const rows = await db
|
||||||
|
.select()
|
||||||
|
.from(messagesTable)
|
||||||
|
.where(conditions.length > 0 ? and(...conditions) : undefined)
|
||||||
|
.orderBy(desc(messagesTable.created_at), desc(messagesTable.id))
|
||||||
|
.limit(fetchLimit);
|
||||||
|
|
||||||
|
const hasMore = rows.length > query.limit;
|
||||||
|
const data = (rows.slice(0, query.limit) as MessageRecord[]);
|
||||||
|
|
||||||
|
let nextCursor: string | null = null;
|
||||||
|
if (hasMore && data.length > 0) {
|
||||||
|
const lastItem = data[data.length - 1];
|
||||||
|
nextCursor = encodeCursor({
|
||||||
|
created_at: lastItem.created_at,
|
||||||
|
id: lastItem.id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return { data, nextCursor };
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
{
|
||||||
|
query,
|
||||||
|
error: error instanceof Error ? error.message : String(error),
|
||||||
|
},
|
||||||
|
"Failed to list messages",
|
||||||
|
);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function listReviewMessages(
|
||||||
|
query: Omit<MessageQuery, "status">,
|
||||||
|
): Promise<PageResult<MessageRecord>> {
|
||||||
|
return listMessages({
|
||||||
|
...query,
|
||||||
|
status: ["warn", "flagged", "error"],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
13
tests/moderation/messageStoreQueries.test.ts
Normal file
13
tests/moderation/messageStoreQueries.test.ts
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import { decodeCursor, encodeCursor } from "../../src/moderation/messageStore";
|
||||||
|
|
||||||
|
describe("message cursor helpers", () => {
|
||||||
|
it("round-trips created_at and id", () => {
|
||||||
|
const cursor = encodeCursor({ created_at: 1710000000000, id: "abc" });
|
||||||
|
expect(decodeCursor(cursor)).toEqual({ created_at: 1710000000000, id: "abc" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns null for invalid cursor", () => {
|
||||||
|
expect(decodeCursor("not-base64-json")).toBeNull();
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user