From 80081a6f62acca3bdece26d001c881fd736c1d0a Mon Sep 17 00:00:00 2001 From: MythEclipse Date: Wed, 13 May 2026 16:00:21 +0700 Subject: [PATCH] refactor: modularize recorder orchestration --- src/recorder.ts | 332 +++++++++++++----------------------------------- 1 file changed, 87 insertions(+), 245 deletions(-) diff --git a/src/recorder.ts b/src/recorder.ts index 54c34a2..b2fee12 100644 --- a/src/recorder.ts +++ b/src/recorder.ts @@ -1,3 +1,5 @@ +import fs from "node:fs"; +import path from "node:path"; import { EndBehaviorType, entersState, @@ -6,14 +8,19 @@ import { VoiceConnectionStatus, } from "@discordjs/voice"; import type { Client, VoiceChannel } from "discord.js-selfbot-v13"; -import fs from "fs"; -import path from "path"; import prism from "prism-media"; -import { pipeline } from "stream/promises"; import { config } from "./config"; import { PacketFilter } from "./packetFilter"; +import { subscribeToAudioStream } from "./recorder/audioStream"; +import { OpusDecoder } from "./recorder/decoder"; +import { + collectUserMetadata, + createSegmentMetadata, +} from "./recorder/metadata"; +import { SegmentManager } from "./recorder/segment"; +import type { PcmBroadcaster } from "./types"; -const recordingsDir = process.env.RECORDINGS_DIR ?? "./recordings"; +const recordingsDir = config.recordingsDir; // Pastikan folder recordings ada if (!fs.existsSync(recordingsDir)) { @@ -63,44 +70,19 @@ export async function startRecording( } const receiver = connection.receiver; + const broadcaster = globalThis as typeof globalThis & PcmBroadcaster; // Dengarkan siapapun yang mulai bicara receiver.speaking.on("start", async (userId) => { - // Coba ambil data user dari cache atau fetch dari API - const user = - client.users.cache.get(userId) || - (await client.users.fetch(userId).catch(() => null)); - const member = - channel.guild.members.cache.get(userId) || - (await channel.guild.members.fetch(userId).catch(() => null)); - const username = user?.username ?? "Unknown User"; - const avatarUrl = - user?.displayAvatarURL({ format: "png", size: 64 }) ?? - "https://cdn.discordapp.com/embed/avatars/0.png"; - const displayName = member?.displayName ?? username; - const roles = - member?.roles.cache - .filter((role) => role.id !== channel.guild.id) - .sort((a, b) => b.position - a.position) - .map((role) => ({ - id: role.id, - name: role.name, - position: role.position, - })) ?? []; - const highestRole = roles.length > 0 ? roles[0] : null; - const joinedTimestamp = member?.joinedTimestamp ?? null; - - // Tampilkan format "nama user [voice activity]" - console.log(`${username} [voice activity]`); + const userMetadata = await collectUserMetadata(client, userId, channel); + console.log(`${userMetadata.username} [voice activity]`); // Notify webserver - if ((global as any).updateActiveUser) { - (global as any).updateActiveUser(userId, { - username, - avatar: avatarUrl, - speaking: true, - }); - } + broadcaster.updateActiveUser?.(userId, { + username: userMetadata.username, + avatar: userMetadata.avatarUrl, + speaking: true, + }); // Jangan record kalau sudah ada stream aktif untuk user ini if (receiver.subscriptions.has(userId)) return; @@ -108,238 +90,98 @@ export async function startRecording( const timestamp = Date.now(); const sessionStartTime = timestamp; const sessionId = `${userId}-${sessionStartTime}`; - const recordingSegmentMsRaw = Number( - process.env.RECORDING_SEGMENT_MS ?? 5_000, - ); - const recordingSegmentMs = - Number.isFinite(recordingSegmentMsRaw) && recordingSegmentMsRaw > 0 - ? recordingSegmentMsRaw - : 0; const userDir = path.join(recordingsDir, userId); if (!fs.existsSync(userDir)) { fs.mkdirSync(userDir, { recursive: true }); } - const audioStream = receiver.subscribe(userId, { - end: { - behavior: EndBehaviorType.AfterSilence, - duration: 3000, - }, - }); - try { // --- OGG file recording with segment rotation --- const packetFilterForOgg = new PacketFilter(8); + const audioStream = receiver.subscribe(userId, { + end: { + behavior: EndBehaviorType.AfterSilence, + duration: 3000, + }, + }); const oggPacketStream = audioStream.pipe(packetFilterForOgg); - let segmentIndex = 0; - let currentSegment: { - index: number; - startTime: number; - endTime: number | null; - filename: string; - jsonFilename: string; - oggStream: any; - out: fs.WriteStream; - } | null = null; - - const openSegment = () => { - const index = segmentIndex++; - const startTime = Date.now(); - const segmentFilename = path.join(userDir, `${startTime}.ogg`); - const segmentJsonFilename = path.join(userDir, `${startTime}.json`); - const oggStream = new prism.opus.OggLogicalBitstream({ - opusHead: new prism.opus.OpusHead({ - channelCount: 2, - sampleRate: 48000, - }), - pageSizeControl: { maxPackets: 10 }, - crc: true, - }); - const out = fs.createWriteStream(segmentFilename); - oggPacketStream.pipe(oggStream).pipe(out); - - const segment = { - index, - startTime, - endTime: null as number | null, - filename: segmentFilename, - jsonFilename: segmentJsonFilename, - oggStream, - out, - }; - - out.on("finish", () => { - if (config.verbose) { - console.log(`[recorder] Saved: ${segment.filename}`); - } - const endTime = segment.endTime ?? Date.now(); - - const eventMetadata = { - userId, - username, - tag: user?.tag ?? "Unknown#0000", - displayName, - avatarUrl, - bot: user?.bot ?? false, - roles, - highestRole, - joinedTimestamp, - sessionId, - sessionStartTime, - segmentIndex: segment.index, - segmentMs: recordingSegmentMs, - startTime: segment.startTime, - endTime, - durationMs: endTime - segment.startTime, - filename: path.basename(segment.filename), - }; - fs.writeFileSync( - segment.jsonFilename, - JSON.stringify(eventMetadata, null, 2), - ); - if (config.verbose) { - console.log(`[recorder] Saved metadata: ${segment.jsonFilename}`); - } - }); - - out.on("error", (err) => { - console.error(`[recorder] File write error ${userId}:`, err.message); - }); - - return segment; - }; - - const closeSegment = () => { - if (!currentSegment) return; - currentSegment.endTime = Date.now(); - oggPacketStream.unpipe(currentSegment.oggStream); - currentSegment.oggStream.end(); - currentSegment = null; - }; - - const rotateSegmentIfNeeded = () => { - if (!currentSegment) return; - if (recordingSegmentMs <= 0) return; - if (Date.now() - currentSegment.startTime < recordingSegmentMs) return; - closeSegment(); - currentSegment = openSegment(); - }; - - currentSegment = openSegment(); + const segmentManager = new SegmentManager( + userDir, + config.recordingSegmentMs, + ); // --- Web broadcast: prism decoder with safe restart and cooldown --- - // OpusScript can crash on long/invalid streams; avoid taking down the process. - const decoderConfig = { - frameSize: 960, - channels: 2 as const, - rate: 48000 as const, - }; - const decoderCooldownMs = 30_000; - const decoderRotateMs = Number(process.env.DECODER_ROTATE_MS ?? 5_000); - let currentDecoder: prism.opus.Decoder | null = null; - let decoderDisabledUntil = 0; - let decoderCreatedAt = 0; + const decoder = new OpusDecoder({ + cooldownMs: config.decoderCooldownMs, + rotateMs: config.decoderRotateMs, + onData: (pcm) => { + if (!broadcaster.broadcastPcmToWeb) return; + // Downsample 48kHz stereo → 24kHz mono (left channel, every 2nd sample) + const outBuf = Buffer.alloc(pcm.length / 4); + for (let i = 0; i < outBuf.length / 2; i++) { + outBuf.writeInt16LE(pcm.readInt16LE(i * 8), i * 2); + } + broadcaster.broadcastPcmToWeb(outBuf, userId); + }, + }); - const handlePcm = (pcm: Buffer) => { - if (!(global as any).broadcastPcmToWeb) return; - // Downsample 48kHz stereo → 24kHz mono (left channel, every 2nd sample) - const outBuf = Buffer.alloc(pcm.length / 4); - for (let i = 0; i < outBuf.length / 2; i++) { - outBuf.writeInt16LE(pcm.readInt16LE(i * 8), i * 2); + let currentSegment = segmentManager.open(oggPacketStream); + currentSegment.out.on("finish", () => { + if (config.verbose) { + console.log(`[recorder] Saved: ${currentSegment.filename}`); } - (global as any).broadcastPcmToWeb(outBuf, userId); - }; - - const destroyDecoder = () => { - if (!currentDecoder) return; - currentDecoder.removeAllListeners(); - currentDecoder.destroy(); - currentDecoder = null; - decoderCreatedAt = 0; - }; - - const createDecoder = () => { - if (Date.now() < decoderDisabledUntil) return null; - try { - const d = new prism.opus.Decoder(decoderConfig); - d.on("data", handlePcm); - d.on("error", (err) => { - console.warn("[recorder] Opus decoder error, cooling down:", err); - decoderDisabledUntil = Date.now() + decoderCooldownMs; - destroyDecoder(); - }); - decoderCreatedAt = Date.now(); - return d; - } catch (err) { - console.warn( - "[recorder] Opus decoder init failed, cooling down:", - err, + const metadata = createSegmentMetadata( + userMetadata, + currentSegment, + sessionId, + sessionStartTime, + config.recordingSegmentMs, + ); + fs.writeFileSync( + currentSegment.jsonFilename, + JSON.stringify(metadata, null, 2), + ); + if (config.verbose) { + console.log( + `[recorder] Saved metadata: ${currentSegment.jsonFilename}`, ); - decoderDisabledUntil = Date.now() + decoderCooldownMs; - return null; } - }; + }); - const rotateDecoderIfNeeded = () => { - if (!currentDecoder || decoderRotateMs <= 0) return; - if (Date.now() - decoderCreatedAt < decoderRotateMs) return; - destroyDecoder(); - currentDecoder = createDecoder(); - }; - - const ensureDecoder = () => { - if (!currentDecoder) { - currentDecoder = createDecoder(); - } - return currentDecoder; - }; + currentSegment.out.on("error", (err) => { + console.error(`[recorder] File write error ${userId}:`, err.message); + }); // Feed Opus packets one-by-one - let packetCount = 0; - audioStream.on("data", (chunk: Buffer) => { - packetCount++; - if (packetCount <= 5) { - console.log( - `[recorder] Pkt #${packetCount} from ${userId}: ${chunk.length}b | 0x${chunk.slice(0, 4).toString("hex")}`, - ); - } - if (chunk.length < 8) return; // skip tiny control/DTX packets - rotateSegmentIfNeeded(); - if (!(global as any).broadcastPcmToWeb) return; - rotateDecoderIfNeeded(); - const decoder = ensureDecoder(); - if (!decoder) return; - try { + subscribeToAudioStream(receiver, userId, { + onPacket: (chunk) => { + if (chunk.length < 8) return; + segmentManager.rotateIfNeeded(oggPacketStream); + if (!broadcaster.broadcastPcmToWeb) return; + decoder.rotateIfNeeded(); decoder.write(chunk); - } catch (err) { - console.warn( - "[recorder] Opus decoder write failed, cooling down:", - err, - ); - decoderDisabledUntil = Date.now() + decoderCooldownMs; - destroyDecoder(); - } - }); - - audioStream.on("end", () => { - closeSegment(); - destroyDecoder(); - if ((global as any).updateActiveUser) { - (global as any).updateActiveUser(userId, { - username, - avatar: avatarUrl, + }, + onEnd: () => { + const segment = segmentManager.close(oggPacketStream); + decoder.destroy(); + broadcaster.updateActiveUser?.(userId, { + username: userMetadata.username, + avatar: userMetadata.avatarUrl, speaking: false, }); - } + }, + onError: (error) => { + segmentManager.close(oggPacketStream); + decoder.destroy(); + console.error( + `[recorder] Audio Stream error ${userId}:`, + error.message, + ); + }, }); - audioStream.on("error", (err) => { - closeSegment(); - destroyDecoder(); - console.error(`[recorder] Audio Stream error ${userId}:`, err.message); - }); packetFilterForOgg.on("error", (err) => { - closeSegment(); + segmentManager.close(oggPacketStream); console.error( `[recorder] PacketFilter(ogg) error ${userId}:`, err.message,