Compare commits
2 Commits
958a6d7236
...
235c1120c2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
235c1120c2 | ||
|
|
930c399484 |
@@ -1,132 +0,0 @@
|
||||
# PostgreSQL Setup - Complete ✅
|
||||
|
||||
**Date:** 2026-05-14
|
||||
**Status:** ✅ Production Ready with Neon PostgreSQL
|
||||
|
||||
## Summary
|
||||
|
||||
Bot Discord moderation telah berhasil dikonfigurasi untuk menggunakan **PostgreSQL** (Neon) sebagai database utama dengan Drizzle ORM.
|
||||
|
||||
## What Was Done
|
||||
|
||||
### 1. Database Connection Fixed
|
||||
- ✅ Identified database name: `neondb` (bukan `dcbot`)
|
||||
- ✅ Updated `.env` dengan DATABASE_URL yang benar
|
||||
- ✅ Tested koneksi ke Neon PostgreSQL - berhasil
|
||||
|
||||
### 2. Drizzle ORM Updated
|
||||
- ✅ Updated `src/database/drizzle.ts` untuk support DATABASE_URL
|
||||
- ✅ Regenerated migrations untuk PostgreSQL syntax
|
||||
- ✅ Ran migrations successfully: `pnpm run db:migrate:programmatic`
|
||||
|
||||
### 3. Bot Tested
|
||||
- ✅ Bot startup dengan PostgreSQL - berhasil
|
||||
- ✅ Database initialized dengan type: postgres
|
||||
- ✅ Message capture working
|
||||
- ✅ AI analysis worker started
|
||||
- ✅ WebSocket server listening
|
||||
|
||||
## Current Configuration
|
||||
|
||||
```env
|
||||
DATABASE_TYPE=postgres
|
||||
DATABASE_URL=postgresql://neondb_owner:npg_2ziHMPwZCet9@ep-long-glitter-ao3sjoyu-pooler.c-2.ap-southeast-1.aws.neon.tech/neondb?sslmode=verify-full&channel_binding=require&connect_timeout=10
|
||||
```
|
||||
|
||||
## Database Schema Created
|
||||
|
||||
✅ **Tables created in PostgreSQL:**
|
||||
- `muxer_jobs` - Job queue untuk audio processing
|
||||
- `messages` - Text messages dengan AI analysis
|
||||
- `attachments` - File metadata dengan foreign key
|
||||
- `ui_state` - Persistent UI state
|
||||
- `__drizzle_migrations` - Migration tracking
|
||||
|
||||
## Commands Available
|
||||
|
||||
```bash
|
||||
# Start bot dengan PostgreSQL
|
||||
pnpm run dev
|
||||
|
||||
# Generate migrations setelah schema changes
|
||||
pnpm run db:generate
|
||||
|
||||
# Run migrations (programmatic - recommended)
|
||||
pnpm run db:migrate:programmatic
|
||||
|
||||
# Run migrations (Drizzle Kit CLI)
|
||||
pnpm run db:migrate
|
||||
|
||||
# Open Drizzle Studio untuk visual data management
|
||||
pnpm run db:studio
|
||||
```
|
||||
|
||||
## Verification
|
||||
|
||||
### Bot Startup Log
|
||||
```
|
||||
✅ PostgreSQL database initialized
|
||||
✅ Database initialized (type: postgres)
|
||||
✅ Bot logged in
|
||||
✅ Message capture handlers registered
|
||||
✅ AI analysis worker started
|
||||
✅ WebSocket server listening on port 3000
|
||||
✅ Web interface listening
|
||||
✅ Message inserted (from Discord)
|
||||
```
|
||||
|
||||
### Database Tables
|
||||
```sql
|
||||
SELECT table_name FROM information_schema.tables
|
||||
WHERE table_schema = 'public';
|
||||
|
||||
-- Results:
|
||||
-- muxer_jobs
|
||||
-- messages
|
||||
-- attachments
|
||||
-- ui_state
|
||||
-- __drizzle_migrations
|
||||
```
|
||||
|
||||
## Commits
|
||||
|
||||
```
|
||||
47ae7f8 chore: remove temporary test files
|
||||
35269b5 feat: configure postgresql as primary database with neon connection
|
||||
c63a614 docs: add comprehensive drizzle orm migration final summary
|
||||
9889d20 feat: add programmatic migration runner for better PostgreSQL support
|
||||
b580430 docs: add drizzle orm migration completion summary
|
||||
b9d0a06 fix: update drizzle config to read env vars directly for CLI compatibility
|
||||
b600dad fix: correct import ordering and update tests for drizzle-orm migration
|
||||
50d4517 refactor: remove old database adapter files
|
||||
9ff0f0b feat: update application initialization for drizzle
|
||||
1c4b0af refactor: migrate messageStore to drizzle-orm
|
||||
dfe3444 refactor: migrate muxer-queue to drizzle-orm
|
||||
7e528a4 feat: create drizzle database client
|
||||
4e28cf9 feat: add drizzle configuration and initial migrations
|
||||
52b36c9 feat: create drizzle schema definitions
|
||||
b833b6d feat: add drizzle-orm and drizzle-kit dependencies
|
||||
```
|
||||
|
||||
## Key Features
|
||||
|
||||
✅ **Type-Safe Queries** - Full TypeScript support dengan Drizzle ORM
|
||||
✅ **PostgreSQL Support** - Neon cloud database integration
|
||||
✅ **Automatic Migrations** - Drizzle Kit generates migrations
|
||||
✅ **Connection Pooling** - Configurable pool size
|
||||
✅ **Production Ready** - All tests passing, zero errors
|
||||
|
||||
## Next Steps
|
||||
|
||||
1. **Monitor bot performance** dengan PostgreSQL
|
||||
2. **Use Drizzle Studio** untuk visual data management: `pnpm run db:studio`
|
||||
3. **For schema changes**: Update `src/database/schema.ts` → `pnpm run db:generate` → `pnpm run db:migrate:programmatic`
|
||||
4. **Backup strategy** - Setup regular backups di Neon dashboard
|
||||
|
||||
## Status
|
||||
|
||||
🎉 **PostgreSQL migration complete and verified!**
|
||||
|
||||
Bot Discord moderation sekarang menggunakan PostgreSQL (Neon) sebagai database utama dengan Drizzle ORM untuk type-safe operations.
|
||||
|
||||
**Ready for production deployment!** ✅
|
||||
@@ -14,9 +14,6 @@
|
||||
"style": {
|
||||
"noNonNullAssertion": "warn",
|
||||
"useNodejsImportProtocol": "warn"
|
||||
},
|
||||
"suspicious": {
|
||||
"noExplicitAny": "warn"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,19 @@ import type { DashboardEvent } from "./ws/client";
|
||||
import { MessageFeed } from "./components/messages/MessageFeed";
|
||||
import { ReviewPanel } from "./components/review/ReviewPanel";
|
||||
|
||||
function mergeMessages(
|
||||
current: MessageRecord[],
|
||||
incoming: MessageRecord[],
|
||||
): MessageRecord[] {
|
||||
const byId = new Map(current.map((message) => [message.id, message]));
|
||||
for (const message of incoming) {
|
||||
byId.set(message.id, { ...byId.get(message.id), ...message });
|
||||
}
|
||||
return Array.from(byId.values())
|
||||
.sort((a, b) => b.created_at - a.created_at || b.id.localeCompare(a.id))
|
||||
.slice(0, 200);
|
||||
}
|
||||
|
||||
export default function App() {
|
||||
const [messages, setMessages] = useState<MessageRecord[]>([]);
|
||||
const [wsStatus, setWsStatus] = useState<string>("connecting");
|
||||
@@ -17,7 +30,7 @@ export default function App() {
|
||||
listMessages(new URLSearchParams({ limit: "30" }))
|
||||
.then((result) => {
|
||||
if (!cancelled) {
|
||||
setMessages(result.data);
|
||||
setMessages(mergeMessages([], result.data));
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
@@ -29,20 +42,10 @@ export default function App() {
|
||||
const ws = connectDashboardSocket((event: DashboardEvent) => {
|
||||
switch (event.type) {
|
||||
case "message_created":
|
||||
setMessages((prev) => {
|
||||
const existing = prev.some((message) => message.id === event.data.id);
|
||||
if (existing) {
|
||||
return prev.map((message) =>
|
||||
message.id === event.data.id ? event.data : message,
|
||||
);
|
||||
}
|
||||
return [event.data, ...prev].slice(0, 200);
|
||||
});
|
||||
setMessages((prev) => mergeMessages(prev, [event.data]));
|
||||
break;
|
||||
case "message_analyzed":
|
||||
setMessages((prev) =>
|
||||
prev.map((m) => (m.id === event.data.id ? event.data : m)),
|
||||
);
|
||||
setMessages((prev) => mergeMessages(prev, [event.data]));
|
||||
break;
|
||||
case "message_updated":
|
||||
setMessages((prev) =>
|
||||
|
||||
@@ -9,10 +9,22 @@ import {
|
||||
getPendingMessagesByConversation,
|
||||
updateMessageAIAnalysis,
|
||||
} from "./messageStore";
|
||||
import type { AnalysisQueueStatus, MessageRecord } from "./types";
|
||||
import type {
|
||||
AnalysisQueueStatus,
|
||||
MessageRecord,
|
||||
ModerationBroadcaster,
|
||||
} from "./types";
|
||||
|
||||
const logger = createChildLogger("ai-analyzer");
|
||||
|
||||
type ModerationGlobal = typeof globalThis & {
|
||||
moderationBroadcaster?: ModerationBroadcaster;
|
||||
};
|
||||
|
||||
function getModerationBroadcaster(): ModerationBroadcaster | undefined {
|
||||
return (globalThis as ModerationGlobal).moderationBroadcaster;
|
||||
}
|
||||
|
||||
// Debounce state per conversation key
|
||||
const conversationDebounceTimers = new Map<string, NodeJS.Timeout>();
|
||||
// Track conversations currently being processed
|
||||
@@ -117,7 +129,7 @@ async function processBatch(
|
||||
|
||||
// Broadcast analyzed messages
|
||||
for (const row of analyzedRows) {
|
||||
(globalThis as any).moderationBroadcaster?.messageAnalyzed(row);
|
||||
getModerationBroadcaster()?.messageAnalyzed(row);
|
||||
}
|
||||
|
||||
// Clear error cooldown on success
|
||||
@@ -147,7 +159,7 @@ async function processBatch(
|
||||
error: lastError,
|
||||
});
|
||||
if (row) {
|
||||
(globalThis as any).moderationBroadcaster?.messageAnalyzed(row);
|
||||
getModerationBroadcaster()?.messageAnalyzed(row);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,25 @@
|
||||
import type { Client, Message } from "discord.js-selfbot-v13";
|
||||
import type { Channel, Client, Message } from "discord.js-selfbot-v13";
|
||||
import { config } from "../config";
|
||||
import { createChildLogger } from "../logger";
|
||||
import { captureMessage } from "./messageCapture";
|
||||
|
||||
const logger = createChildLogger("backlog-sync");
|
||||
|
||||
type BacklogChannel = Channel & {
|
||||
messages: {
|
||||
fetch(options: { limit: number; before?: string }): Promise<{
|
||||
size: number;
|
||||
values(): IterableIterator<Message>;
|
||||
}>;
|
||||
};
|
||||
};
|
||||
|
||||
function hasMessageBacklog(channel: Channel): channel is BacklogChannel {
|
||||
return "messages" in channel;
|
||||
}
|
||||
|
||||
async function syncChannelMessages(
|
||||
channel: any,
|
||||
channel: BacklogChannel,
|
||||
cutoffTime: number,
|
||||
): Promise<number> {
|
||||
let before: string | undefined;
|
||||
@@ -29,7 +42,7 @@ async function syncChannelMessages(
|
||||
continue;
|
||||
}
|
||||
|
||||
await captureMessage(message, "text");
|
||||
await captureMessage(message, "text", { source: "backlog" });
|
||||
synced++;
|
||||
}
|
||||
|
||||
@@ -77,6 +90,10 @@ export async function syncSelectedChannelBacklog(
|
||||
logger.warn({ guildId, channelId }, "Channel not found for backlog sync");
|
||||
return 0;
|
||||
}
|
||||
if (!hasMessageBacklog(channel)) {
|
||||
logger.warn({ guildId, channelId }, "Channel cannot fetch message backlog");
|
||||
return 0;
|
||||
}
|
||||
|
||||
const cutoffTime = Date.now() - config.BACKLOG_SYNC_HOURS * 60 * 60 * 1000;
|
||||
logger.info(
|
||||
@@ -85,7 +102,7 @@ export async function syncSelectedChannelBacklog(
|
||||
);
|
||||
|
||||
try {
|
||||
const count = await syncChannelMessages(channel as any, cutoffTime);
|
||||
const count = await syncChannelMessages(channel, cutoffTime);
|
||||
logger.info(
|
||||
{ channelId, count },
|
||||
"Backlog sync completed for selected channel",
|
||||
|
||||
@@ -7,11 +7,14 @@ import type {
|
||||
ModerationWsEvent,
|
||||
} from "./types";
|
||||
|
||||
type ClientLike = Pick<WebSocket, "readyState" | "send">;
|
||||
export type BroadcasterClient = Pick<WebSocket, "readyState" | "send">;
|
||||
|
||||
const log = createChildLogger("broadcaster");
|
||||
|
||||
function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void {
|
||||
function sendJson(
|
||||
clients: Set<BroadcasterClient>,
|
||||
event: ModerationWsEvent,
|
||||
): void {
|
||||
const payload = JSON.stringify({ ...event, timestamp: Date.now() });
|
||||
for (const client of clients) {
|
||||
if (client.readyState === 1) {
|
||||
@@ -28,14 +31,14 @@ function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void {
|
||||
}
|
||||
|
||||
export function createBroadcaster() {
|
||||
const clients = new Set<ClientLike>();
|
||||
const clients = new Set<BroadcasterClient>();
|
||||
|
||||
return {
|
||||
addClient(client: ClientLike) {
|
||||
addClient(client: BroadcasterClient) {
|
||||
clients.add(client);
|
||||
log.debug({ clientCount: clients.size }, "Client added");
|
||||
},
|
||||
removeClient(client: ClientLike) {
|
||||
removeClient(client: BroadcasterClient) {
|
||||
clients.delete(client);
|
||||
log.debug({ clientCount: clients.size }, "Client removed");
|
||||
},
|
||||
|
||||
@@ -22,9 +22,18 @@ import type {
|
||||
|
||||
const logger = createChildLogger("message-capture");
|
||||
|
||||
type ModerationGlobal = typeof globalThis & {
|
||||
moderationBroadcaster?: ModerationBroadcaster;
|
||||
};
|
||||
|
||||
function getModerationBroadcaster(): ModerationBroadcaster | undefined {
|
||||
return (globalThis as ModerationGlobal).moderationBroadcaster;
|
||||
}
|
||||
|
||||
export async function captureMessage(
|
||||
message: Message,
|
||||
type: "text" | "edited" | "deleted",
|
||||
options: { source?: "live" | "backlog" } = {},
|
||||
): Promise<void> {
|
||||
const location = getMessageLocation(message);
|
||||
const metadata = getMessageMetadata(message);
|
||||
@@ -46,17 +55,19 @@ export async function captureMessage(
|
||||
metadata: JSON.stringify(metadata),
|
||||
};
|
||||
|
||||
await upsertMessageForCapture(messageRecord);
|
||||
queueMessageAnalysis(message.id);
|
||||
const inserted = await upsertMessageForCapture(messageRecord);
|
||||
if (!inserted) {
|
||||
return;
|
||||
}
|
||||
|
||||
const broadcaster = (globalThis as any).moderationBroadcaster as
|
||||
| ModerationBroadcaster
|
||||
| undefined;
|
||||
if (broadcaster) {
|
||||
broadcaster.messageCreated({
|
||||
...messageRecord,
|
||||
type: "text",
|
||||
});
|
||||
const isBacklog = options.source === "backlog";
|
||||
if (!isBacklog) {
|
||||
queueMessageAnalysis(message.id);
|
||||
}
|
||||
|
||||
const broadcaster = getModerationBroadcaster();
|
||||
if (broadcaster && !isBacklog) {
|
||||
broadcaster.messageCreated(messageRecord);
|
||||
}
|
||||
|
||||
if (message.attachments.size > 0) {
|
||||
@@ -132,9 +143,7 @@ export function registerMessageCapture(client: Client): void {
|
||||
);
|
||||
queueMessageAnalysis(newMessage.id);
|
||||
|
||||
const broadcaster = (globalThis as any).moderationBroadcaster as
|
||||
| ModerationBroadcaster
|
||||
| undefined;
|
||||
const broadcaster = getModerationBroadcaster();
|
||||
if (broadcaster) {
|
||||
broadcaster.messageUpdated({
|
||||
id: newMessage.id,
|
||||
@@ -164,9 +173,7 @@ export function registerMessageCapture(client: Client): void {
|
||||
const deletedAt = Date.now();
|
||||
await updateMessageAsDeleted(message.id, deletedAt);
|
||||
|
||||
const broadcaster = (globalThis as any).moderationBroadcaster as
|
||||
| ModerationBroadcaster
|
||||
| undefined;
|
||||
const broadcaster = getModerationBroadcaster();
|
||||
if (broadcaster) {
|
||||
broadcaster.messageDeleted({
|
||||
id: message.id,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { and, asc, desc, eq, isNull, or, sql } from "drizzle-orm";
|
||||
import { and, asc, desc, eq, isNull, or, type SQL, sql } from "drizzle-orm";
|
||||
import { getDatabase } from "../database/drizzle";
|
||||
import { attachmentsTable, messagesTable } from "../database/schema";
|
||||
import { createChildLogger } from "../logger";
|
||||
@@ -11,6 +11,29 @@ import type {
|
||||
|
||||
const logger = createChildLogger("message-store");
|
||||
|
||||
interface QueryBuilder<T = unknown> extends PromiseLike<T> {
|
||||
from(...args: unknown[]): QueryBuilder<T>;
|
||||
where(...args: unknown[]): QueryBuilder<T>;
|
||||
orderBy(...args: unknown[]): QueryBuilder<T>;
|
||||
limit(...args: unknown[]): QueryBuilder<T>;
|
||||
offset(...args: unknown[]): QueryBuilder<T>;
|
||||
values(...args: unknown[]): QueryBuilder<T>;
|
||||
onConflictDoNothing(...args: unknown[]): QueryBuilder<T>;
|
||||
returning(...args: unknown[]): QueryBuilder<T>;
|
||||
set(...args: unknown[]): QueryBuilder<T>;
|
||||
}
|
||||
|
||||
interface MessageDatabase {
|
||||
select<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
|
||||
selectDistinct<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
|
||||
insert<T = unknown>(...args: unknown[]): QueryBuilder<T>;
|
||||
update(...args: unknown[]): QueryBuilder<unknown>;
|
||||
}
|
||||
|
||||
function db(): MessageDatabase {
|
||||
return getDatabase() as unknown as MessageDatabase;
|
||||
}
|
||||
|
||||
// Cursor helpers for pagination
|
||||
interface CursorData {
|
||||
created_at: number;
|
||||
@@ -36,8 +59,8 @@ export function decodeCursor(cursor?: string): CursorData | null {
|
||||
|
||||
export async function insertMessage(message: MessageRecord): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db.insert(messagesTable).values(message).onConflictDoNothing();
|
||||
const database = db();
|
||||
await database.insert(messagesTable).values(message).onConflictDoNothing();
|
||||
|
||||
logger.debug(
|
||||
{ messageId: message.id, channelId: message.channel_id },
|
||||
@@ -57,26 +80,26 @@ export async function insertMessage(message: MessageRecord): Promise<void> {
|
||||
|
||||
export async function upsertMessageForCapture(
|
||||
message: MessageRecord,
|
||||
): Promise<void> {
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
|
||||
// Set ai_status to pending for new or recaptured/edited text
|
||||
const database = db();
|
||||
const messageWithAIStatus = {
|
||||
...message,
|
||||
ai_status: "pending" as const,
|
||||
};
|
||||
|
||||
// Try insert first (fast path for new messages)
|
||||
await db
|
||||
.insert(messagesTable)
|
||||
const rows = await database
|
||||
.insert<Array<{ id: string }>>(messagesTable)
|
||||
.values(messageWithAIStatus)
|
||||
.onConflictDoNothing();
|
||||
.onConflictDoNothing()
|
||||
.returning({ id: messagesTable.id });
|
||||
|
||||
const inserted = rows.length > 0;
|
||||
logger.debug(
|
||||
{ messageId: message.id, channelId: message.channel_id },
|
||||
"Message upserted for capture",
|
||||
{ messageId: message.id, channelId: message.channel_id, inserted },
|
||||
inserted ? "Message inserted for capture" : "Message already captured",
|
||||
);
|
||||
return inserted;
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{
|
||||
@@ -95,8 +118,8 @@ export async function updateMessageAsEdited(
|
||||
editedAt: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(messagesTable)
|
||||
.set({
|
||||
edited_content: editedContent,
|
||||
@@ -130,8 +153,8 @@ export async function updateMessageAsDeleted(
|
||||
deletedAt: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(messagesTable)
|
||||
.set({
|
||||
deleted_at: deletedAt,
|
||||
@@ -158,8 +181,8 @@ export async function getMessagesByChannel(
|
||||
offset: number = 0,
|
||||
): Promise<MessageRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const rows = await db
|
||||
const database = db();
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(
|
||||
@@ -189,8 +212,11 @@ export async function insertAttachment(
|
||||
attachment: AttachmentRecord,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db.insert(attachmentsTable).values(attachment).onConflictDoNothing();
|
||||
const database = db();
|
||||
await database
|
||||
.insert(attachmentsTable)
|
||||
.values(attachment)
|
||||
.onConflictDoNothing();
|
||||
|
||||
logger.debug(
|
||||
{ attachmentId: attachment.id, messageId: attachment.message_id },
|
||||
@@ -214,8 +240,8 @@ export async function getAttachmentsByChannel(
|
||||
offset: number = 0,
|
||||
): Promise<AttachmentRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const rows = await db
|
||||
const database = db();
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(attachmentsTable)
|
||||
.where(
|
||||
@@ -247,8 +273,8 @@ export async function updateAttachmentAsUploaded(
|
||||
uploadedAt: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(attachmentsTable)
|
||||
.set({
|
||||
uploaded_url: uploadedUrl,
|
||||
@@ -278,8 +304,8 @@ export async function updateAttachmentAsFailedUpload(
|
||||
error: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(attachmentsTable)
|
||||
.set({
|
||||
upload_status: "failed",
|
||||
@@ -315,8 +341,8 @@ export async function updateMessageAIAnalysis(
|
||||
result: AIAnalysisUpdate,
|
||||
): Promise<MessageRecord | null> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
await db
|
||||
const database = db();
|
||||
await database
|
||||
.update(messagesTable)
|
||||
.set({
|
||||
ai_status: result.status,
|
||||
@@ -329,7 +355,7 @@ export async function updateMessageAIAnalysis(
|
||||
})
|
||||
.where(eq(messagesTable.id, messageId));
|
||||
|
||||
const rows = await db
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(eq(messagesTable.id, messageId));
|
||||
@@ -351,8 +377,8 @@ export async function getPendingAIAnalysisMessages(
|
||||
limit: number = 25,
|
||||
): Promise<MessageRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const rows = await db
|
||||
const database = db();
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(
|
||||
@@ -378,8 +404,8 @@ export async function getMessageById(
|
||||
messageId: string,
|
||||
): Promise<MessageRecord | null> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const rows = await db
|
||||
const database = db();
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(eq(messagesTable.id, messageId));
|
||||
@@ -401,8 +427,8 @@ export async function listMessages(
|
||||
query: MessageQuery,
|
||||
): Promise<PageResult<MessageRecord>> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const conditions: any[] = [];
|
||||
const database = db();
|
||||
const conditions: SQL[] = [];
|
||||
|
||||
// Apply filters
|
||||
if (query.guildId) {
|
||||
@@ -411,10 +437,7 @@ export async function listMessages(
|
||||
|
||||
if (query.channelId) {
|
||||
conditions.push(
|
||||
or(
|
||||
eq(messagesTable.channel_id, query.channelId),
|
||||
eq(messagesTable.thread_id, query.channelId),
|
||||
),
|
||||
sql`(${messagesTable.channel_id} = ${query.channelId} or ${messagesTable.thread_id} = ${query.channelId})`,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -427,11 +450,7 @@ export async function listMessages(
|
||||
}
|
||||
|
||||
if (query.status && query.status.length > 0) {
|
||||
conditions.push(
|
||||
or(
|
||||
...query.status.map((status) => eq(messagesTable.ai_status, status)),
|
||||
),
|
||||
);
|
||||
conditions.push(sql`${messagesTable.ai_status} in ${query.status}`);
|
||||
}
|
||||
|
||||
// Text search
|
||||
@@ -445,20 +464,14 @@ export async function listMessages(
|
||||
const cursorData = decodeCursor(query.cursor);
|
||||
if (cursorData) {
|
||||
conditions.push(
|
||||
or(
|
||||
sql`${messagesTable.created_at} < ${cursorData.created_at}`,
|
||||
and(
|
||||
eq(messagesTable.created_at, cursorData.created_at),
|
||||
sql`${messagesTable.id} < ${cursorData.id}`,
|
||||
),
|
||||
),
|
||||
sql`(${messagesTable.created_at} < ${cursorData.created_at} or (${messagesTable.created_at} = ${cursorData.created_at} and ${messagesTable.id} < ${cursorData.id}))`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch limit + 1 to determine if there's a next page
|
||||
const fetchLimit = query.limit + 1;
|
||||
const rows = await db
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(conditions.length > 0 ? and(...conditions) : undefined)
|
||||
@@ -506,7 +519,7 @@ export async function getConversationContextBefore(input: {
|
||||
limit: number;
|
||||
}): Promise<MessageRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const database = db();
|
||||
const { channelId, threadId, beforeCreatedAt, limit } = input;
|
||||
|
||||
// Query same thread if threadId exists, otherwise channelId
|
||||
@@ -514,7 +527,7 @@ export async function getConversationContextBefore(input: {
|
||||
? eq(messagesTable.thread_id, threadId)
|
||||
: eq(messagesTable.channel_id, channelId);
|
||||
|
||||
const rows = await db
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(
|
||||
@@ -547,11 +560,11 @@ export async function getPendingMessagesByConversation(
|
||||
limit: number = 25,
|
||||
): Promise<MessageRecord[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
// conversationKey is either thread_id or channel_id
|
||||
// Query both to safely handle the key
|
||||
const rows = await db
|
||||
const rows = await database
|
||||
.select()
|
||||
.from(messagesTable)
|
||||
.where(
|
||||
@@ -584,11 +597,11 @@ export async function getPendingConversationKeys(
|
||||
limit: number = 100,
|
||||
): Promise<string[]> {
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
// Get distinct conversation keys (thread_id or channel_id) for pending messages
|
||||
const rows = await db
|
||||
.selectDistinct({
|
||||
const rows = await database
|
||||
.selectDistinct<Array<{ thread_id: string | null; channel_id: string }>>({
|
||||
thread_id: messagesTable.thread_id,
|
||||
channel_id: messagesTable.channel_id,
|
||||
})
|
||||
@@ -602,7 +615,7 @@ export async function getPendingConversationKeys(
|
||||
.limit(limit);
|
||||
|
||||
const keys: string[] = [];
|
||||
for (const row of rows as any[]) {
|
||||
for (const row of rows) {
|
||||
const key = row.thread_id || row.channel_id;
|
||||
if (key && !keys.includes(key)) {
|
||||
keys.push(key);
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import type { ModerationBroadcaster } from "./broadcaster";
|
||||
import type { BroadcasterClient, ModerationBroadcaster } from "./broadcaster";
|
||||
|
||||
export type AIStatus = "pending" | "clean" | "warn" | "flagged" | "error";
|
||||
|
||||
export type { ModerationBroadcaster };
|
||||
export type { BroadcasterClient, ModerationBroadcaster };
|
||||
|
||||
export interface MessageRecord {
|
||||
id: string;
|
||||
|
||||
@@ -8,8 +8,28 @@ import { createChildLogger } from "./logger";
|
||||
|
||||
const logger = createChildLogger("muxer-queue");
|
||||
|
||||
// Type alias for backward compatibility
|
||||
export type SqliteDatabase = any;
|
||||
interface QueryBuilder<T = unknown> extends PromiseLike<T> {
|
||||
from(...args: unknown[]): QueryBuilder<T>;
|
||||
where(...args: unknown[]): QueryBuilder<T>;
|
||||
orderBy(...args: unknown[]): QueryBuilder<T>;
|
||||
limit(...args: unknown[]): QueryBuilder<T>;
|
||||
values(...args: unknown[]): QueryBuilder<T>;
|
||||
onConflictDoNothing(...args: unknown[]): QueryBuilder<T>;
|
||||
onConflictDoUpdate(...args: unknown[]): QueryBuilder<T>;
|
||||
set(...args: unknown[]): QueryBuilder<T>;
|
||||
groupBy(...args: unknown[]): QueryBuilder<T>;
|
||||
}
|
||||
|
||||
export interface SqliteDatabase {
|
||||
select<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
|
||||
insert(...args: unknown[]): QueryBuilder<unknown>;
|
||||
update(...args: unknown[]): QueryBuilder<unknown>;
|
||||
delete(...args: unknown[]): QueryBuilder<unknown>;
|
||||
}
|
||||
|
||||
function db(): SqliteDatabase {
|
||||
return getDrizzleDatabase() as unknown as SqliteDatabase;
|
||||
}
|
||||
|
||||
export interface MuxerJobData {
|
||||
userId: string;
|
||||
@@ -18,6 +38,22 @@ export interface MuxerJobData {
|
||||
outputDir: string;
|
||||
}
|
||||
|
||||
interface StoredJobRow {
|
||||
id: string;
|
||||
data: string;
|
||||
status: "pending" | "processing" | "completed" | "failed";
|
||||
attempts: number;
|
||||
maxAttempts: number;
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
error: string | null;
|
||||
}
|
||||
|
||||
interface JobStatsRow {
|
||||
status: "pending" | "processing" | "completed" | "failed";
|
||||
count: number | string | { count: number | string };
|
||||
}
|
||||
|
||||
interface StoredJob {
|
||||
id: string;
|
||||
data: string;
|
||||
@@ -31,7 +67,7 @@ interface StoredJob {
|
||||
|
||||
// Export getDatabase for backward compatibility with webserver.ts
|
||||
export function getDatabase(): SqliteDatabase {
|
||||
return getDrizzleDatabase() as any;
|
||||
return db();
|
||||
}
|
||||
|
||||
export async function getPersistedValue<T>(
|
||||
@@ -39,10 +75,10 @@ export async function getPersistedValue<T>(
|
||||
fallback: T,
|
||||
): Promise<T> {
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
const row = await db
|
||||
.select()
|
||||
const row = await database
|
||||
.select<Array<{ value: string }>>()
|
||||
.from(uiStateTable)
|
||||
.where(eq(uiStateTable.key, key))
|
||||
.limit(1);
|
||||
@@ -61,9 +97,9 @@ export async function setPersistedValue(
|
||||
value: unknown,
|
||||
): Promise<void> {
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
await db
|
||||
await database
|
||||
.insert(uiStateTable)
|
||||
.values({
|
||||
key,
|
||||
@@ -82,12 +118,12 @@ export async function setPersistedValue(
|
||||
export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
|
||||
try {
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
const jobId = `${data.userId}-${data.sessionId}`;
|
||||
const now = Date.now();
|
||||
|
||||
await db
|
||||
await database
|
||||
.insert(muxerJobsTable)
|
||||
.values({
|
||||
id: jobId,
|
||||
@@ -120,16 +156,16 @@ export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
|
||||
|
||||
export async function getPendingJobs(): Promise<StoredJob[]> {
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
const rows = await db
|
||||
.select()
|
||||
const rows = await database
|
||||
.select<StoredJobRow[]>()
|
||||
.from(muxerJobsTable)
|
||||
.where(eq(muxerJobsTable.status, "pending"))
|
||||
.orderBy(asc(muxerJobsTable.createdAt))
|
||||
.limit(10);
|
||||
|
||||
return rows.map((row: any) => ({
|
||||
return rows.map((row) => ({
|
||||
id: row.id,
|
||||
data: row.data,
|
||||
status: row.status as "pending" | "processing" | "completed" | "failed",
|
||||
@@ -147,11 +183,11 @@ export async function updateJobStatus(
|
||||
error?: string,
|
||||
): Promise<void> {
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const database = db();
|
||||
const now = Date.now();
|
||||
|
||||
if (status === "failed") {
|
||||
await db
|
||||
await database
|
||||
.update(muxerJobsTable)
|
||||
.set({
|
||||
status,
|
||||
@@ -161,7 +197,7 @@ export async function updateJobStatus(
|
||||
})
|
||||
.where(eq(muxerJobsTable.id, jobId));
|
||||
} else {
|
||||
await db
|
||||
await database
|
||||
.update(muxerJobsTable)
|
||||
.set({
|
||||
status,
|
||||
@@ -175,10 +211,10 @@ export async function updateJobStatus(
|
||||
|
||||
export async function retryFailedJob(jobId: string): Promise<boolean> {
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
const jobs = await db
|
||||
.select()
|
||||
const jobs = await database
|
||||
.select<StoredJobRow[]>()
|
||||
.from(muxerJobsTable)
|
||||
.where(eq(muxerJobsTable.id, jobId))
|
||||
.limit(1);
|
||||
@@ -198,7 +234,7 @@ export async function retryFailedJob(jobId: string): Promise<boolean> {
|
||||
return false;
|
||||
}
|
||||
|
||||
await db
|
||||
await database
|
||||
.update(muxerJobsTable)
|
||||
.set({
|
||||
status: "pending",
|
||||
@@ -215,10 +251,10 @@ export async function cleanupCompletedJobs(
|
||||
olderThanMs: number = 24 * 60 * 60 * 1000,
|
||||
): Promise<number> {
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const database = db();
|
||||
const cutoffTime = Date.now() - olderThanMs;
|
||||
|
||||
const result = await db
|
||||
const result = await database
|
||||
.delete(muxerJobsTable)
|
||||
.where(
|
||||
and(
|
||||
@@ -228,8 +264,8 @@ export async function cleanupCompletedJobs(
|
||||
);
|
||||
|
||||
const deletedCount =
|
||||
typeof result === "object" && "rowsAffected" in result
|
||||
? result.rowsAffected
|
||||
typeof result === "object" && result !== null && "rowsAffected" in result
|
||||
? Number(result.rowsAffected)
|
||||
: 0;
|
||||
|
||||
logger.info({ deletedCount }, "Cleaned up completed jobs");
|
||||
@@ -244,10 +280,10 @@ export async function getJobStats(): Promise<{
|
||||
failed: number;
|
||||
}> {
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const database = db();
|
||||
|
||||
const rows = await db
|
||||
.select({
|
||||
const rows = await database
|
||||
.select<JobStatsRow[]>({
|
||||
status: muxerJobsTable.status,
|
||||
count: sql<number>`COUNT(*)`,
|
||||
})
|
||||
@@ -264,7 +300,7 @@ export async function getJobStats(): Promise<{
|
||||
for (const row of rows) {
|
||||
const count =
|
||||
typeof row.count === "object" && "count" in row.count
|
||||
? (row.count as any).count
|
||||
? Number((row.count as { count: number | string }).count)
|
||||
: Number(row.count);
|
||||
if (row.status === "pending") stats.pending = count;
|
||||
else if (row.status === "processing") stats.processing = count;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import {
|
||||
type DiscordGatewayAdapterCreator,
|
||||
EndBehaviorType,
|
||||
entersState,
|
||||
getVoiceConnection,
|
||||
@@ -41,7 +42,8 @@ export async function startRecording(
|
||||
const connection = joinVoiceChannel({
|
||||
channelId: channel.id,
|
||||
guildId: channel.guild.id,
|
||||
adapterCreator: channel.guild.voiceAdapterCreator as any,
|
||||
adapterCreator: channel.guild
|
||||
.voiceAdapterCreator as DiscordGatewayAdapterCreator,
|
||||
selfDeaf: false,
|
||||
selfMute: false,
|
||||
debug: true,
|
||||
|
||||
@@ -88,13 +88,16 @@ export class VoiceController {
|
||||
await guild.channels.fetch().catch(() => null);
|
||||
|
||||
const threads: ChannelSummary[] = [];
|
||||
type ThreadFetchResult = {
|
||||
threads: Map<string, { id: string; name: string; type: string }>;
|
||||
};
|
||||
for (const channel of guild.channels.cache.values()) {
|
||||
const threadParent = channel as typeof channel & {
|
||||
threads?: {
|
||||
fetch: (options: {
|
||||
archived: boolean;
|
||||
limit: number;
|
||||
}) => Promise<any>;
|
||||
}) => Promise<ThreadFetchResult>;
|
||||
};
|
||||
};
|
||||
if (!threadParent.threads?.fetch) continue;
|
||||
|
||||
@@ -10,6 +10,7 @@ import { AppError } from "./errors";
|
||||
import { createChildLogger, logger } from "./logger";
|
||||
import { getMetrics, uptimeGauge } from "./metrics";
|
||||
import { createBroadcaster } from "./moderation/broadcaster";
|
||||
import type { ModerationBroadcaster } from "./moderation/types";
|
||||
import { getPersistedValue, setPersistedValue } from "./muxer-queue";
|
||||
import { discordPlayer } from "./player";
|
||||
import { createAnalysisRoutes } from "./routes/analysisRoutes";
|
||||
@@ -26,6 +27,15 @@ const activeUsers = new Map<
|
||||
{ username: string; avatar: string; speaking: boolean }
|
||||
>();
|
||||
|
||||
type VoiceGlobals = typeof globalThis & {
|
||||
moderationBroadcaster?: ModerationBroadcaster;
|
||||
broadcastPcmToWeb?: (chunk: Buffer, userId: string) => void;
|
||||
updateActiveUser?: (
|
||||
userId: string,
|
||||
data: { username: string; avatar: string; speaking: boolean },
|
||||
) => void;
|
||||
};
|
||||
|
||||
interface SharedUIState {
|
||||
selectedGuild: string;
|
||||
selectedVoiceChannel: string;
|
||||
@@ -118,7 +128,7 @@ export async function startWebserver(
|
||||
|
||||
// Create broadcaster instance
|
||||
const broadcaster = createBroadcaster();
|
||||
(globalThis as any).moderationBroadcaster = broadcaster;
|
||||
(globalThis as VoiceGlobals).moderationBroadcaster = broadcaster;
|
||||
|
||||
// Security headers. CSP disabled because the current static UI uses inline scripts/styles.
|
||||
app.use(
|
||||
@@ -196,7 +206,10 @@ export async function startWebserver(
|
||||
app.use("/api", createSyncRoutes(_client));
|
||||
|
||||
// Inbound: Discord PCM → tagged chunks → browser
|
||||
(global as any).broadcastPcmToWeb = (chunk: Buffer, userId: string) => {
|
||||
(globalThis as VoiceGlobals).broadcastPcmToWeb = (
|
||||
chunk: Buffer,
|
||||
userId: string,
|
||||
) => {
|
||||
let hash = 0;
|
||||
for (let i = 0; i < userId.length; i++) {
|
||||
hash = (hash << 5) - hash + userId.charCodeAt(i);
|
||||
@@ -210,7 +223,7 @@ export async function startWebserver(
|
||||
}
|
||||
};
|
||||
|
||||
(global as any).updateActiveUser = (
|
||||
(globalThis as VoiceGlobals).updateActiveUser = (
|
||||
userId: string,
|
||||
data: { username: string; avatar: string; speaking: boolean },
|
||||
) => {
|
||||
@@ -327,7 +340,7 @@ export async function startWebserver(
|
||||
);
|
||||
ws.send(JSON.stringify({ type: "ui_state", state: getSharedUIState() }));
|
||||
|
||||
ws.on("message", (data: any) => {
|
||||
ws.on("message", (data: Buffer | ArrayBuffer | Buffer[]) => {
|
||||
if (!Buffer.isBuffer(data)) return;
|
||||
lastBrowserAudioTime = Date.now();
|
||||
|
||||
|
||||
@@ -4,9 +4,9 @@ import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
|
||||
const originalEnv = process.env;
|
||||
|
||||
describe("Drizzle ORM Database", () => {
|
||||
let config: any;
|
||||
let drizzle: any;
|
||||
let logger: any;
|
||||
let config: typeof import("../src/config").config;
|
||||
let drizzle: typeof import("../src/database/drizzle");
|
||||
let logger: ReturnType<typeof import("../src/logger").createChildLogger>;
|
||||
|
||||
beforeAll(async () => {
|
||||
// Set up environment for config loading
|
||||
|
||||
@@ -1,17 +1,70 @@
|
||||
import type { Mock } from "vitest";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createBroadcaster } from "../../src/moderation/broadcaster";
|
||||
import {
|
||||
type BroadcasterClient,
|
||||
createBroadcaster,
|
||||
} from "../../src/moderation/broadcaster";
|
||||
import type {
|
||||
AttachmentRecord,
|
||||
MessageRecord,
|
||||
} from "../../src/moderation/types";
|
||||
|
||||
function client() {
|
||||
type TestClient = BroadcasterClient & { send: Mock };
|
||||
|
||||
function client(): TestClient {
|
||||
return { readyState: 1, send: vi.fn() };
|
||||
}
|
||||
|
||||
function messageRecord(overrides: Partial<MessageRecord> = {}): MessageRecord {
|
||||
return {
|
||||
id: "m1",
|
||||
guild_id: "guild-1",
|
||||
channel_id: "channel-1",
|
||||
thread_id: null,
|
||||
user_id: "user-1",
|
||||
username: "alice",
|
||||
avatar_url: null,
|
||||
content: "test",
|
||||
edited_content: null,
|
||||
created_at: 1,
|
||||
edited_at: null,
|
||||
deleted_at: null,
|
||||
type: "text",
|
||||
metadata: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function attachmentRecord(
|
||||
overrides: Partial<AttachmentRecord> = {},
|
||||
): AttachmentRecord {
|
||||
return {
|
||||
id: "a1",
|
||||
message_id: "m1",
|
||||
guild_id: "guild-1",
|
||||
channel_id: "channel-1",
|
||||
thread_id: null,
|
||||
user_id: "user-1",
|
||||
filename: "image.png",
|
||||
size: 1,
|
||||
type: "image/png",
|
||||
discord_url: "https://example.com/image.png",
|
||||
uploaded_url: null,
|
||||
upload_status: "pending",
|
||||
upload_error: null,
|
||||
created_at: 1,
|
||||
uploaded_at: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("createBroadcaster", () => {
|
||||
it("sends JSON events to open clients", () => {
|
||||
const ws = client();
|
||||
const broadcaster = createBroadcaster();
|
||||
|
||||
broadcaster.addClient(ws as any);
|
||||
broadcaster.messageAnalyzed({ id: "m1", ai_status: "clean" } as any);
|
||||
broadcaster.addClient(ws);
|
||||
broadcaster.messageAnalyzed(messageRecord({ ai_status: "clean" }));
|
||||
|
||||
expect(ws.send).toHaveBeenCalledTimes(1);
|
||||
expect(JSON.parse(ws.send.mock.calls[0][0])).toMatchObject({
|
||||
@@ -21,10 +74,10 @@ describe("createBroadcaster", () => {
|
||||
});
|
||||
|
||||
it("skips closed clients", () => {
|
||||
const ws = { readyState: 3, send: vi.fn() };
|
||||
const ws: TestClient = { readyState: 3, send: vi.fn() };
|
||||
const broadcaster = createBroadcaster();
|
||||
|
||||
broadcaster.addClient(ws as any);
|
||||
broadcaster.addClient(ws);
|
||||
broadcaster.messageDeleted({ id: "m1", deleted_at: 123 });
|
||||
|
||||
expect(ws.send).not.toHaveBeenCalled();
|
||||
@@ -36,14 +89,11 @@ describe("createBroadcaster", () => {
|
||||
const ws3 = client();
|
||||
const broadcaster = createBroadcaster();
|
||||
|
||||
broadcaster.addClient(ws1 as any);
|
||||
broadcaster.addClient(ws2 as any);
|
||||
broadcaster.addClient(ws3 as any);
|
||||
broadcaster.addClient(ws1);
|
||||
broadcaster.addClient(ws2);
|
||||
broadcaster.addClient(ws3);
|
||||
|
||||
broadcaster.messageCreated({
|
||||
id: "m1",
|
||||
content: "test",
|
||||
} as any);
|
||||
broadcaster.messageCreated(messageRecord());
|
||||
|
||||
expect(ws1.send).toHaveBeenCalledTimes(1);
|
||||
expect(ws2.send).toHaveBeenCalledTimes(1);
|
||||
@@ -61,14 +111,14 @@ describe("createBroadcaster", () => {
|
||||
throw new Error("Send failed");
|
||||
});
|
||||
|
||||
broadcaster.addClient(ws1 as any);
|
||||
broadcaster.addClient(ws2 as any);
|
||||
broadcaster.addClient(ws3 as any);
|
||||
broadcaster.addClient(ws1);
|
||||
broadcaster.addClient(ws2);
|
||||
broadcaster.addClient(ws3);
|
||||
|
||||
broadcaster.messageUpdated({
|
||||
id: "m1",
|
||||
content: "updated",
|
||||
} as any);
|
||||
});
|
||||
|
||||
// ws1 attempted send (threw)
|
||||
expect(ws1.send).toHaveBeenCalledTimes(1);
|
||||
@@ -84,16 +134,16 @@ describe("createBroadcaster", () => {
|
||||
|
||||
expect(broadcaster.clientCount()).toBe(0);
|
||||
|
||||
broadcaster.addClient(ws1 as any);
|
||||
broadcaster.addClient(ws1);
|
||||
expect(broadcaster.clientCount()).toBe(1);
|
||||
|
||||
broadcaster.addClient(ws2 as any);
|
||||
broadcaster.addClient(ws2);
|
||||
expect(broadcaster.clientCount()).toBe(2);
|
||||
|
||||
broadcaster.removeClient(ws1 as any);
|
||||
broadcaster.removeClient(ws1);
|
||||
expect(broadcaster.clientCount()).toBe(1);
|
||||
|
||||
broadcaster.removeClient(ws2 as any);
|
||||
broadcaster.removeClient(ws2);
|
||||
expect(broadcaster.clientCount()).toBe(0);
|
||||
});
|
||||
|
||||
@@ -101,11 +151,8 @@ describe("createBroadcaster", () => {
|
||||
const ws = client();
|
||||
const broadcaster = createBroadcaster();
|
||||
|
||||
broadcaster.addClient(ws as any);
|
||||
broadcaster.attachmentCreated({
|
||||
id: "a1",
|
||||
message_id: "m1",
|
||||
} as any);
|
||||
broadcaster.addClient(ws);
|
||||
broadcaster.attachmentCreated(attachmentRecord());
|
||||
|
||||
expect(ws.send).toHaveBeenCalledTimes(1);
|
||||
const payload = JSON.parse(ws.send.mock.calls[0][0]);
|
||||
|
||||
156
tests/moderation/messageCapture.test.ts
Normal file
156
tests/moderation/messageCapture.test.ts
Normal file
@@ -0,0 +1,156 @@
|
||||
import {
|
||||
afterAll,
|
||||
beforeAll,
|
||||
beforeEach,
|
||||
describe,
|
||||
expect,
|
||||
it,
|
||||
vi,
|
||||
} from "vitest";
|
||||
import {
|
||||
closeDatabase,
|
||||
getDatabase,
|
||||
initializeDatabase,
|
||||
} from "../../src/database/drizzle";
|
||||
import { captureMessage } from "../../src/moderation/messageCapture";
|
||||
import type { ModerationBroadcaster } from "../../src/moderation/types";
|
||||
|
||||
const queueMessageAnalysis = vi.fn();
|
||||
|
||||
type TestMessage = Parameters<typeof captureMessage>[0];
|
||||
type ModerationTestGlobal = typeof globalThis & {
|
||||
moderationBroadcaster?: Partial<ModerationBroadcaster>;
|
||||
};
|
||||
|
||||
interface TestDatabase {
|
||||
run(sql: string): void;
|
||||
}
|
||||
|
||||
function getTestDatabase(): TestDatabase {
|
||||
return getDatabase() as unknown as TestDatabase;
|
||||
}
|
||||
|
||||
vi.mock("../../src/moderation/aiAnalyzer", () => ({
|
||||
queueMessageAnalysis: (id: string) => queueMessageAnalysis(id),
|
||||
}));
|
||||
|
||||
function createMessage(id = "message-1"): TestMessage {
|
||||
return {
|
||||
id,
|
||||
guildId: "guild-1",
|
||||
channelId: "channel-1",
|
||||
author: {
|
||||
id: "user-1",
|
||||
username: "alice",
|
||||
bot: false,
|
||||
avatarURL: () => null,
|
||||
},
|
||||
content: "hello",
|
||||
cleanContent: "hello",
|
||||
createdTimestamp: 1_700_000_000_000,
|
||||
attachments: new Map(),
|
||||
stickers: new Map(),
|
||||
embeds: [],
|
||||
member: null,
|
||||
reference: null,
|
||||
channel: {
|
||||
id: "channel-1",
|
||||
name: "general",
|
||||
isThread: () => false,
|
||||
},
|
||||
} as unknown as TestMessage;
|
||||
}
|
||||
|
||||
async function createTables() {
|
||||
const db = getTestDatabase();
|
||||
db.run(`
|
||||
CREATE TABLE IF NOT EXISTS "messages" (
|
||||
"id" text PRIMARY KEY NOT NULL,
|
||||
"guild_id" text NOT NULL,
|
||||
"channel_id" text NOT NULL,
|
||||
"thread_id" text,
|
||||
"user_id" text NOT NULL,
|
||||
"username" text NOT NULL,
|
||||
"avatar_url" text,
|
||||
"content" text NOT NULL,
|
||||
"edited_content" text,
|
||||
"created_at" integer NOT NULL,
|
||||
"edited_at" integer,
|
||||
"deleted_at" integer,
|
||||
"type" text DEFAULT 'text' NOT NULL,
|
||||
"metadata" text,
|
||||
"ai_status" text DEFAULT 'pending' NOT NULL,
|
||||
"ai_moderation_flags" text,
|
||||
"ai_moderation_score" real,
|
||||
"ai_moderation_raw" text,
|
||||
"ai_analysis" text,
|
||||
"ai_analyzed_at" integer,
|
||||
"ai_error" text
|
||||
)
|
||||
`);
|
||||
db.run(`
|
||||
CREATE TABLE IF NOT EXISTS "attachments" (
|
||||
"id" text PRIMARY KEY NOT NULL,
|
||||
"message_id" text NOT NULL,
|
||||
"guild_id" text NOT NULL,
|
||||
"channel_id" text NOT NULL,
|
||||
"thread_id" text,
|
||||
"user_id" text NOT NULL,
|
||||
"filename" text NOT NULL,
|
||||
"size" integer NOT NULL,
|
||||
"type" text NOT NULL,
|
||||
"discord_url" text NOT NULL,
|
||||
"uploaded_url" text,
|
||||
"upload_status" text DEFAULT 'pending' NOT NULL,
|
||||
"upload_error" text,
|
||||
"created_at" integer NOT NULL,
|
||||
"uploaded_at" integer
|
||||
)
|
||||
`);
|
||||
}
|
||||
|
||||
describe("captureMessage", () => {
|
||||
beforeAll(async () => {
|
||||
await initializeDatabase();
|
||||
await createTables();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
queueMessageAnalysis.mockClear();
|
||||
const db = getTestDatabase();
|
||||
db.run(`DELETE FROM "attachments"`);
|
||||
db.run(`DELETE FROM "messages"`);
|
||||
delete (globalThis as ModerationTestGlobal).moderationBroadcaster;
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await closeDatabase();
|
||||
});
|
||||
|
||||
it("does not requeue or rebroadcast a duplicate captured message", async () => {
|
||||
const message = createMessage();
|
||||
const messageCreated = vi.fn();
|
||||
(globalThis as ModerationTestGlobal).moderationBroadcaster = {
|
||||
messageCreated,
|
||||
};
|
||||
|
||||
await captureMessage(message, "text");
|
||||
await captureMessage(message, "text");
|
||||
|
||||
expect(queueMessageAnalysis).toHaveBeenCalledTimes(1);
|
||||
expect(messageCreated).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not queue or broadcast backlog captures one message at a time", async () => {
|
||||
const message = createMessage("backlog-message-1");
|
||||
const messageCreated = vi.fn();
|
||||
(globalThis as ModerationTestGlobal).moderationBroadcaster = {
|
||||
messageCreated,
|
||||
};
|
||||
|
||||
await captureMessage(message, "text", { source: "backlog" });
|
||||
|
||||
expect(queueMessageAnalysis).not.toHaveBeenCalled();
|
||||
expect(messageCreated).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -16,6 +16,14 @@ import {
|
||||
} from "../../src/moderation/messageStore";
|
||||
import type { MessageRecord } from "../../src/moderation/types";
|
||||
|
||||
interface TestDatabase {
|
||||
run(sql: string): void;
|
||||
}
|
||||
|
||||
function getTestDatabase(): TestDatabase {
|
||||
return getDatabase() as unknown as TestDatabase;
|
||||
}
|
||||
|
||||
const logger = createChildLogger("messageStoreQueries.test");
|
||||
|
||||
describe("message cursor helpers", () => {
|
||||
@@ -36,7 +44,7 @@ describe("message query integration tests", () => {
|
||||
beforeAll(async () => {
|
||||
await initializeDatabase();
|
||||
// Create tables using Drizzle schema (SQLite doesn't support migrations with PostgreSQL syntax)
|
||||
const db = getDatabase() as any;
|
||||
const db = getTestDatabase();
|
||||
try {
|
||||
// Create messages table
|
||||
await db.run(`
|
||||
@@ -75,7 +83,7 @@ describe("message query integration tests", () => {
|
||||
beforeEach(async () => {
|
||||
// Clear messages table before each test
|
||||
try {
|
||||
const db = getDatabase() as any;
|
||||
const db = getTestDatabase();
|
||||
await db.run(`DELETE FROM "messages"`);
|
||||
} catch (error) {
|
||||
logger.debug({ error }, "Could not clear messages table");
|
||||
|
||||
Reference in New Issue
Block a user