refactor: modularize recorder orchestration

This commit is contained in:
MythEclipse
2026-05-13 16:00:21 +07:00
parent f655daa0c7
commit 80081a6f62

View File

@@ -1,3 +1,5 @@
import fs from "node:fs";
import path from "node:path";
import { import {
EndBehaviorType, EndBehaviorType,
entersState, entersState,
@@ -6,14 +8,19 @@ import {
VoiceConnectionStatus, VoiceConnectionStatus,
} from "@discordjs/voice"; } from "@discordjs/voice";
import type { Client, VoiceChannel } from "discord.js-selfbot-v13"; import type { Client, VoiceChannel } from "discord.js-selfbot-v13";
import fs from "fs";
import path from "path";
import prism from "prism-media"; import prism from "prism-media";
import { pipeline } from "stream/promises";
import { config } from "./config"; import { config } from "./config";
import { PacketFilter } from "./packetFilter"; import { PacketFilter } from "./packetFilter";
import { subscribeToAudioStream } from "./recorder/audioStream";
import { OpusDecoder } from "./recorder/decoder";
import {
collectUserMetadata,
createSegmentMetadata,
} from "./recorder/metadata";
import { SegmentManager } from "./recorder/segment";
import type { PcmBroadcaster } from "./types";
const recordingsDir = process.env.RECORDINGS_DIR ?? "./recordings"; const recordingsDir = config.recordingsDir;
// Pastikan folder recordings ada // Pastikan folder recordings ada
if (!fs.existsSync(recordingsDir)) { if (!fs.existsSync(recordingsDir)) {
@@ -63,44 +70,19 @@ export async function startRecording(
} }
const receiver = connection.receiver; const receiver = connection.receiver;
const broadcaster = globalThis as typeof globalThis & PcmBroadcaster;
// Dengarkan siapapun yang mulai bicara // Dengarkan siapapun yang mulai bicara
receiver.speaking.on("start", async (userId) => { receiver.speaking.on("start", async (userId) => {
// Coba ambil data user dari cache atau fetch dari API const userMetadata = await collectUserMetadata(client, userId, channel);
const user = console.log(`${userMetadata.username} [voice activity]`);
client.users.cache.get(userId) ||
(await client.users.fetch(userId).catch(() => null));
const member =
channel.guild.members.cache.get(userId) ||
(await channel.guild.members.fetch(userId).catch(() => null));
const username = user?.username ?? "Unknown User";
const avatarUrl =
user?.displayAvatarURL({ format: "png", size: 64 }) ??
"https://cdn.discordapp.com/embed/avatars/0.png";
const displayName = member?.displayName ?? username;
const roles =
member?.roles.cache
.filter((role) => role.id !== channel.guild.id)
.sort((a, b) => b.position - a.position)
.map((role) => ({
id: role.id,
name: role.name,
position: role.position,
})) ?? [];
const highestRole = roles.length > 0 ? roles[0] : null;
const joinedTimestamp = member?.joinedTimestamp ?? null;
// Tampilkan format "nama user [voice activity]"
console.log(`${username} [voice activity]`);
// Notify webserver // Notify webserver
if ((global as any).updateActiveUser) { broadcaster.updateActiveUser?.(userId, {
(global as any).updateActiveUser(userId, { username: userMetadata.username,
username, avatar: userMetadata.avatarUrl,
avatar: avatarUrl, speaking: true,
speaking: true, });
});
}
// Jangan record kalau sudah ada stream aktif untuk user ini // Jangan record kalau sudah ada stream aktif untuk user ini
if (receiver.subscriptions.has(userId)) return; if (receiver.subscriptions.has(userId)) return;
@@ -108,238 +90,98 @@ export async function startRecording(
const timestamp = Date.now(); const timestamp = Date.now();
const sessionStartTime = timestamp; const sessionStartTime = timestamp;
const sessionId = `${userId}-${sessionStartTime}`; const sessionId = `${userId}-${sessionStartTime}`;
const recordingSegmentMsRaw = Number(
process.env.RECORDING_SEGMENT_MS ?? 5_000,
);
const recordingSegmentMs =
Number.isFinite(recordingSegmentMsRaw) && recordingSegmentMsRaw > 0
? recordingSegmentMsRaw
: 0;
const userDir = path.join(recordingsDir, userId); const userDir = path.join(recordingsDir, userId);
if (!fs.existsSync(userDir)) { if (!fs.existsSync(userDir)) {
fs.mkdirSync(userDir, { recursive: true }); fs.mkdirSync(userDir, { recursive: true });
} }
const audioStream = receiver.subscribe(userId, {
end: {
behavior: EndBehaviorType.AfterSilence,
duration: 3000,
},
});
try { try {
// --- OGG file recording with segment rotation --- // --- OGG file recording with segment rotation ---
const packetFilterForOgg = new PacketFilter(8); const packetFilterForOgg = new PacketFilter(8);
const audioStream = receiver.subscribe(userId, {
end: {
behavior: EndBehaviorType.AfterSilence,
duration: 3000,
},
});
const oggPacketStream = audioStream.pipe(packetFilterForOgg); const oggPacketStream = audioStream.pipe(packetFilterForOgg);
let segmentIndex = 0; const segmentManager = new SegmentManager(
let currentSegment: { userDir,
index: number; config.recordingSegmentMs,
startTime: number; );
endTime: number | null;
filename: string;
jsonFilename: string;
oggStream: any;
out: fs.WriteStream;
} | null = null;
const openSegment = () => {
const index = segmentIndex++;
const startTime = Date.now();
const segmentFilename = path.join(userDir, `${startTime}.ogg`);
const segmentJsonFilename = path.join(userDir, `${startTime}.json`);
const oggStream = new prism.opus.OggLogicalBitstream({
opusHead: new prism.opus.OpusHead({
channelCount: 2,
sampleRate: 48000,
}),
pageSizeControl: { maxPackets: 10 },
crc: true,
});
const out = fs.createWriteStream(segmentFilename);
oggPacketStream.pipe(oggStream).pipe(out);
const segment = {
index,
startTime,
endTime: null as number | null,
filename: segmentFilename,
jsonFilename: segmentJsonFilename,
oggStream,
out,
};
out.on("finish", () => {
if (config.verbose) {
console.log(`[recorder] Saved: ${segment.filename}`);
}
const endTime = segment.endTime ?? Date.now();
const eventMetadata = {
userId,
username,
tag: user?.tag ?? "Unknown#0000",
displayName,
avatarUrl,
bot: user?.bot ?? false,
roles,
highestRole,
joinedTimestamp,
sessionId,
sessionStartTime,
segmentIndex: segment.index,
segmentMs: recordingSegmentMs,
startTime: segment.startTime,
endTime,
durationMs: endTime - segment.startTime,
filename: path.basename(segment.filename),
};
fs.writeFileSync(
segment.jsonFilename,
JSON.stringify(eventMetadata, null, 2),
);
if (config.verbose) {
console.log(`[recorder] Saved metadata: ${segment.jsonFilename}`);
}
});
out.on("error", (err) => {
console.error(`[recorder] File write error ${userId}:`, err.message);
});
return segment;
};
const closeSegment = () => {
if (!currentSegment) return;
currentSegment.endTime = Date.now();
oggPacketStream.unpipe(currentSegment.oggStream);
currentSegment.oggStream.end();
currentSegment = null;
};
const rotateSegmentIfNeeded = () => {
if (!currentSegment) return;
if (recordingSegmentMs <= 0) return;
if (Date.now() - currentSegment.startTime < recordingSegmentMs) return;
closeSegment();
currentSegment = openSegment();
};
currentSegment = openSegment();
// --- Web broadcast: prism decoder with safe restart and cooldown --- // --- Web broadcast: prism decoder with safe restart and cooldown ---
// OpusScript can crash on long/invalid streams; avoid taking down the process. const decoder = new OpusDecoder({
const decoderConfig = { cooldownMs: config.decoderCooldownMs,
frameSize: 960, rotateMs: config.decoderRotateMs,
channels: 2 as const, onData: (pcm) => {
rate: 48000 as const, if (!broadcaster.broadcastPcmToWeb) return;
}; // Downsample 48kHz stereo → 24kHz mono (left channel, every 2nd sample)
const decoderCooldownMs = 30_000; const outBuf = Buffer.alloc(pcm.length / 4);
const decoderRotateMs = Number(process.env.DECODER_ROTATE_MS ?? 5_000); for (let i = 0; i < outBuf.length / 2; i++) {
let currentDecoder: prism.opus.Decoder | null = null; outBuf.writeInt16LE(pcm.readInt16LE(i * 8), i * 2);
let decoderDisabledUntil = 0; }
let decoderCreatedAt = 0; broadcaster.broadcastPcmToWeb(outBuf, userId);
},
});
const handlePcm = (pcm: Buffer) => { let currentSegment = segmentManager.open(oggPacketStream);
if (!(global as any).broadcastPcmToWeb) return; currentSegment.out.on("finish", () => {
// Downsample 48kHz stereo → 24kHz mono (left channel, every 2nd sample) if (config.verbose) {
const outBuf = Buffer.alloc(pcm.length / 4); console.log(`[recorder] Saved: ${currentSegment.filename}`);
for (let i = 0; i < outBuf.length / 2; i++) {
outBuf.writeInt16LE(pcm.readInt16LE(i * 8), i * 2);
} }
(global as any).broadcastPcmToWeb(outBuf, userId); const metadata = createSegmentMetadata(
}; userMetadata,
currentSegment,
const destroyDecoder = () => { sessionId,
if (!currentDecoder) return; sessionStartTime,
currentDecoder.removeAllListeners(); config.recordingSegmentMs,
currentDecoder.destroy(); );
currentDecoder = null; fs.writeFileSync(
decoderCreatedAt = 0; currentSegment.jsonFilename,
}; JSON.stringify(metadata, null, 2),
);
const createDecoder = () => { if (config.verbose) {
if (Date.now() < decoderDisabledUntil) return null; console.log(
try { `[recorder] Saved metadata: ${currentSegment.jsonFilename}`,
const d = new prism.opus.Decoder(decoderConfig);
d.on("data", handlePcm);
d.on("error", (err) => {
console.warn("[recorder] Opus decoder error, cooling down:", err);
decoderDisabledUntil = Date.now() + decoderCooldownMs;
destroyDecoder();
});
decoderCreatedAt = Date.now();
return d;
} catch (err) {
console.warn(
"[recorder] Opus decoder init failed, cooling down:",
err,
); );
decoderDisabledUntil = Date.now() + decoderCooldownMs;
return null;
} }
}; });
const rotateDecoderIfNeeded = () => { currentSegment.out.on("error", (err) => {
if (!currentDecoder || decoderRotateMs <= 0) return; console.error(`[recorder] File write error ${userId}:`, err.message);
if (Date.now() - decoderCreatedAt < decoderRotateMs) return; });
destroyDecoder();
currentDecoder = createDecoder();
};
const ensureDecoder = () => {
if (!currentDecoder) {
currentDecoder = createDecoder();
}
return currentDecoder;
};
// Feed Opus packets one-by-one // Feed Opus packets one-by-one
let packetCount = 0; subscribeToAudioStream(receiver, userId, {
audioStream.on("data", (chunk: Buffer) => { onPacket: (chunk) => {
packetCount++; if (chunk.length < 8) return;
if (packetCount <= 5) { segmentManager.rotateIfNeeded(oggPacketStream);
console.log( if (!broadcaster.broadcastPcmToWeb) return;
`[recorder] Pkt #${packetCount} from ${userId}: ${chunk.length}b | 0x${chunk.slice(0, 4).toString("hex")}`, decoder.rotateIfNeeded();
);
}
if (chunk.length < 8) return; // skip tiny control/DTX packets
rotateSegmentIfNeeded();
if (!(global as any).broadcastPcmToWeb) return;
rotateDecoderIfNeeded();
const decoder = ensureDecoder();
if (!decoder) return;
try {
decoder.write(chunk); decoder.write(chunk);
} catch (err) { },
console.warn( onEnd: () => {
"[recorder] Opus decoder write failed, cooling down:", const segment = segmentManager.close(oggPacketStream);
err, decoder.destroy();
); broadcaster.updateActiveUser?.(userId, {
decoderDisabledUntil = Date.now() + decoderCooldownMs; username: userMetadata.username,
destroyDecoder(); avatar: userMetadata.avatarUrl,
}
});
audioStream.on("end", () => {
closeSegment();
destroyDecoder();
if ((global as any).updateActiveUser) {
(global as any).updateActiveUser(userId, {
username,
avatar: avatarUrl,
speaking: false, speaking: false,
}); });
} },
onError: (error) => {
segmentManager.close(oggPacketStream);
decoder.destroy();
console.error(
`[recorder] Audio Stream error ${userId}:`,
error.message,
);
},
}); });
audioStream.on("error", (err) => {
closeSegment();
destroyDecoder();
console.error(`[recorder] Audio Stream error ${userId}:`, err.message);
});
packetFilterForOgg.on("error", (err) => { packetFilterForOgg.on("error", (err) => {
closeSegment(); segmentManager.close(oggPacketStream);
console.error( console.error(
`[recorder] PacketFilter(ogg) error ${userId}:`, `[recorder] PacketFilter(ogg) error ${userId}:`,
err.message, err.message,