refactor: implement robust Opus decoding with error-resilient streams and transition to pull-based audio transmission for Discord output
This commit is contained in:
@@ -41,6 +41,14 @@ export class DiscordPlayer {
|
|||||||
this.player.play(resource);
|
this.player.play(resource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public pause() {
|
||||||
|
this.player.pause(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public unpause() {
|
||||||
|
this.player.unpause();
|
||||||
|
}
|
||||||
|
|
||||||
public stop() {
|
public stop() {
|
||||||
this.player.stop();
|
this.player.stop();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ export async function startRecording(client: Client, channel: VoiceChannel): Pro
|
|||||||
const audioStream = receiver.subscribe(userId, {
|
const audioStream = receiver.subscribe(userId, {
|
||||||
end: {
|
end: {
|
||||||
behavior: EndBehaviorType.AfterSilence,
|
behavior: EndBehaviorType.AfterSilence,
|
||||||
duration: 3000, // 3 seconds — avoids FFmpeg restart overhead between utterances
|
duration: 3000,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -105,40 +105,48 @@ export async function startRecording(client: Client, channel: VoiceChannel): Pro
|
|||||||
const out = fs.createWriteStream(filename);
|
const out = fs.createWriteStream(filename);
|
||||||
audioStream.pipe(packetFilterForOgg).pipe(oggStream).pipe(out);
|
audioStream.pipe(packetFilterForOgg).pipe(oggStream).pipe(out);
|
||||||
|
|
||||||
// --- Web broadcast: pure JS Opus → PCM, no FFmpeg ---
|
// --- Web broadcast: prism decoder with auto-recreate on error ---
|
||||||
// Create a fresh decoder for each user session
|
// Prism's Transform stream enters a dead error state after first bad packet.
|
||||||
const opusDecoder = new prism.opus.Decoder({ frameSize: 960, channels: 2, rate: 48000 });
|
// We recreate the decoder instance when this happens, so subsequent packets
|
||||||
|
// are decoded normally. Each packet failure is fully isolated.
|
||||||
|
function makePcmListener(onPcm: (pcm: Buffer) => void) {
|
||||||
|
const d = new prism.opus.Decoder({ frameSize: 960, channels: 2, rate: 48000 });
|
||||||
|
d.on('data', onPcm);
|
||||||
|
d.on('error', () => {
|
||||||
|
// Decoder is dead — swap to a fresh one
|
||||||
|
currentDecoder = makePcmListener(onPcm);
|
||||||
|
});
|
||||||
|
return d;
|
||||||
|
}
|
||||||
|
|
||||||
// CRITICAL: Swallow decode errors (DAVE/bad packets) without crashing
|
const handlePcm = (pcm: Buffer) => {
|
||||||
opusDecoder.on('error', () => {});
|
|
||||||
|
|
||||||
// Downsample 48kHz stereo → 24kHz mono (take left channel, every 2nd sample)
|
|
||||||
opusDecoder.on('data', (pcm: Buffer) => {
|
|
||||||
if (!(global as any).broadcastPcmToWeb) return;
|
if (!(global as any).broadcastPcmToWeb) return;
|
||||||
// Input: 48kHz stereo s16le → 4 bytes per sample-pair
|
// Downsample 48kHz stereo → 24kHz mono (left channel, every 2nd sample)
|
||||||
// Output: 24kHz mono s16le → 2 bytes per sample
|
|
||||||
const outBuf = Buffer.alloc(pcm.length / 4);
|
const outBuf = Buffer.alloc(pcm.length / 4);
|
||||||
for (let i = 0; i < outBuf.length / 2; i++) {
|
for (let i = 0; i < outBuf.length / 2; i++) {
|
||||||
outBuf.writeInt16LE(pcm.readInt16LE(i * 8), i * 2);
|
outBuf.writeInt16LE(pcm.readInt16LE(i * 8), i * 2);
|
||||||
}
|
}
|
||||||
(global as any).broadcastPcmToWeb(outBuf, userId);
|
(global as any).broadcastPcmToWeb(outBuf, userId);
|
||||||
});
|
};
|
||||||
|
|
||||||
// Feed Opus packets one-by-one; catch per-packet decode errors
|
let currentDecoder = makePcmListener(handlePcm);
|
||||||
|
|
||||||
|
// Feed Opus packets one-by-one
|
||||||
let packetCount = 0;
|
let packetCount = 0;
|
||||||
audioStream.on('data', (chunk: Buffer) => {
|
audioStream.on('data', (chunk: Buffer) => {
|
||||||
packetCount++;
|
packetCount++;
|
||||||
if (packetCount <= 5) {
|
if (packetCount <= 5) {
|
||||||
console.log(`[recorder] Pkt #${packetCount} from ${userId}: ${chunk.length}b | 0x${chunk.slice(0,4).toString('hex')}`);
|
console.log(`[recorder] Pkt #${packetCount} from ${userId}: ${chunk.length}b | 0x${chunk.slice(0,4).toString('hex')}`);
|
||||||
}
|
}
|
||||||
if (chunk.length < 8) return; // skip tiny control packets
|
if (chunk.length < 8) return; // skip tiny control/DTX packets
|
||||||
try {
|
try {
|
||||||
opusDecoder.write(chunk);
|
currentDecoder.write(chunk);
|
||||||
} catch (_) {} // per-packet isolation — don't let one bad packet stop the stream
|
} catch (_) {
|
||||||
|
currentDecoder = makePcmListener(handlePcm);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
audioStream.on('end', () => {
|
audioStream.on('end', () => {
|
||||||
opusDecoder.end();
|
|
||||||
if ((global as any).updateActiveUser) {
|
if ((global as any).updateActiveUser) {
|
||||||
(global as any).updateActiveUser(userId, { username, avatar, speaking: false });
|
(global as any).updateActiveUser(userId, { username, avatar, speaking: false });
|
||||||
}
|
}
|
||||||
|
|||||||
107
src/webserver.ts
107
src/webserver.ts
@@ -8,20 +8,31 @@ import { discordPlayer } from "./player";
|
|||||||
const activeUsers = new Map<string, { username: string, avatar: string, speaking: boolean }>();
|
const activeUsers = new Map<string, { username: string, avatar: string, speaking: boolean }>();
|
||||||
let wsClients = new Set<any>();
|
let wsClients = new Set<any>();
|
||||||
|
|
||||||
// --- Upsampling: 24kHz mono s16le → 48kHz stereo s16le (pure JS, no FFmpeg) ---
|
// Upsample 24kHz mono s16le → 48kHz stereo s16le (pure JS)
|
||||||
// Each input sample is duplicated into 2 stereo pairs to double the sample rate.
|
function upsample(mono24k: Buffer): Buffer {
|
||||||
function upsample24kMonoTo48kStereo(mono24k: Buffer): Buffer {
|
const out = Buffer.alloc(mono24k.length * 4);
|
||||||
const out = Buffer.alloc(mono24k.length * 4); // 2x rate * 2ch = 4x bytes
|
|
||||||
for (let i = 0; i < mono24k.length / 2; i++) {
|
for (let i = 0; i < mono24k.length / 2; i++) {
|
||||||
const s = mono24k.readInt16LE(i * 2);
|
const s = mono24k.readInt16LE(i * 2);
|
||||||
out.writeInt16LE(s, i * 8); // t=0 L
|
out.writeInt16LE(s, i * 8);
|
||||||
out.writeInt16LE(s, i * 8 + 2); // t=0 R
|
out.writeInt16LE(s, i * 8 + 2);
|
||||||
out.writeInt16LE(s, i * 8 + 4); // t=1 L (duplicate for 2x rate)
|
out.writeInt16LE(s, i * 8 + 4);
|
||||||
out.writeInt16LE(s, i * 8 + 6); // t=1 R
|
out.writeInt16LE(s, i * 8 + 6);
|
||||||
}
|
}
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Calculate RMS dB level of a PCM s16le buffer
|
||||||
|
function rmsDb(pcm: Buffer): number {
|
||||||
|
let sum = 0;
|
||||||
|
const samples = pcm.length / 2;
|
||||||
|
for (let i = 0; i < samples; i++) {
|
||||||
|
const s = pcm.readInt16LE(i * 2) / 32768;
|
||||||
|
sum += s * s;
|
||||||
|
}
|
||||||
|
const rms = Math.sqrt(sum / samples);
|
||||||
|
return 20 * Math.log10(Math.max(rms, 1e-10));
|
||||||
|
}
|
||||||
|
|
||||||
export function startWebserver(port: number = 3000) {
|
export function startWebserver(port: number = 3000) {
|
||||||
const app = express();
|
const app = express();
|
||||||
const server = http.createServer(app);
|
const server = http.createServer(app);
|
||||||
@@ -32,7 +43,7 @@ export function startWebserver(port: number = 3000) {
|
|||||||
|
|
||||||
app.use(express.static(path.join(__dirname, "../public")));
|
app.use(express.static(path.join(__dirname, "../public")));
|
||||||
|
|
||||||
// --- Inbound: Discord PCM → tagged chunks → browser (set in recorder.ts) ---
|
// Inbound: Discord PCM → tagged chunks → browser
|
||||||
(global as any).broadcastPcmToWeb = (chunk: Buffer, userId: string) => {
|
(global as any).broadcastPcmToWeb = (chunk: Buffer, userId: string) => {
|
||||||
let hash = 0;
|
let hash = 0;
|
||||||
for (let i = 0; i < userId.length; i++) {
|
for (let i = 0; i < userId.length; i++) {
|
||||||
@@ -62,33 +73,80 @@ export function startWebserver(port: number = 3000) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Outbound: browser PCM (24kHz mono) → Opus → Discord, NO FFmpeg ---
|
// --- Outbound: browser PCM (24kHz mono) → Opus → Discord ---
|
||||||
const RATE = 48000;
|
const RATE = 48000;
|
||||||
const CHANNELS = 2;
|
const CHANNELS = 2;
|
||||||
const FRAME_SIZE = 960; // 20ms @ 48kHz
|
const FRAME_SIZE = 960;
|
||||||
const BYTES_PER_FRAME = FRAME_SIZE * CHANNELS * 2; // 3840 bytes
|
const BYTES_PER_FRAME = FRAME_SIZE * CHANNELS * 2; // 3840 bytes = 20ms
|
||||||
|
const SILENCE_TAIL_MS = 300; // continue sending silence for 300ms after browser stops
|
||||||
|
const MAX_BUF_BYTES = BYTES_PER_FRAME * 50; // cap at 1 second to avoid runaway buffer
|
||||||
|
|
||||||
const opusEncoder = new prism.opus.Encoder({ rate: RATE, channels: CHANNELS, frameSize: FRAME_SIZE });
|
const opusEncoder = new prism.opus.Encoder({ rate: RATE, channels: CHANNELS, frameSize: FRAME_SIZE });
|
||||||
const oggBitstream = new prism.opus.OggLogicalBitstream({
|
const oggBitstream = new prism.opus.OggLogicalBitstream({
|
||||||
opusHead: new prism.opus.OpusHead({ channelCount: CHANNELS, sampleRate: RATE }),
|
opusHead: new prism.opus.OpusHead({ channelCount: CHANNELS, sampleRate: RATE }),
|
||||||
pageSizeControl: { maxPackets: 10 },
|
pageSizeControl: { maxPackets: 1 }, // 1 packet per page = 20ms latency
|
||||||
crc: true,
|
crc: true,
|
||||||
});
|
});
|
||||||
opusEncoder.on('error', () => {});
|
opusEncoder.on('error', () => {});
|
||||||
|
|
||||||
opusEncoder.pipe(oggBitstream);
|
opusEncoder.pipe(oggBitstream);
|
||||||
// Prime the encoder immediately so OGG headers are emitted before player reads
|
|
||||||
|
// Prime OGG headers before player starts reading
|
||||||
opusEncoder.write(Buffer.alloc(BYTES_PER_FRAME, 0));
|
opusEncoder.write(Buffer.alloc(BYTES_PER_FRAME, 0));
|
||||||
discordPlayer.playStream(oggBitstream);
|
discordPlayer.playStream(oggBitstream);
|
||||||
|
discordPlayer.pause();
|
||||||
|
|
||||||
let pcmBuffer = Buffer.alloc(0);
|
let pcmBuffer = Buffer.alloc(0);
|
||||||
let lastBrowserAudioTime = 0;
|
let lastBrowserAudioTime = 0;
|
||||||
|
let playerPaused = true;
|
||||||
const SILENCE_FRAME = Buffer.alloc(BYTES_PER_FRAME, 0);
|
const SILENCE_FRAME = Buffer.alloc(BYTES_PER_FRAME, 0);
|
||||||
|
|
||||||
// Keep encoder alive with silence when browser isn't sending
|
// Log level every 2 seconds
|
||||||
|
let dbAccum = 0, dbCount = 0;
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
if (Date.now() - lastBrowserAudioTime > 40) {
|
if (dbCount > 0) {
|
||||||
opusEncoder.write(SILENCE_FRAME);
|
const avg = dbAccum / dbCount;
|
||||||
|
console.log(`[transmit] Audio level: ${avg.toFixed(1)} dBFS (${dbCount} frames/2s)`);
|
||||||
|
dbAccum = 0; dbCount = 0;
|
||||||
|
}
|
||||||
|
}, 2000);
|
||||||
|
|
||||||
|
// PULL-BASED encode loop: fires every 20ms, pulls exactly one frame from buffer.
|
||||||
|
// This avoids the timing conflict where browser bursts and silence timer collide.
|
||||||
|
setInterval(() => {
|
||||||
|
const msSinceAudio = Date.now() - lastBrowserAudioTime;
|
||||||
|
let frame: Buffer | null = null;
|
||||||
|
|
||||||
|
if (pcmBuffer.length >= BYTES_PER_FRAME) {
|
||||||
|
// Real audio available
|
||||||
|
frame = pcmBuffer.slice(0, BYTES_PER_FRAME);
|
||||||
|
pcmBuffer = pcmBuffer.slice(BYTES_PER_FRAME);
|
||||||
|
|
||||||
|
// Track level for logging
|
||||||
|
dbAccum += rmsDb(frame);
|
||||||
|
dbCount++;
|
||||||
|
|
||||||
|
if (playerPaused) {
|
||||||
|
discordPlayer.unpause();
|
||||||
|
playerPaused = false;
|
||||||
|
console.log("[transmit] Transmitting — Discord indicator ON");
|
||||||
|
}
|
||||||
|
} else if (msSinceAudio < SILENCE_TAIL_MS && msSinceAudio > 0) {
|
||||||
|
// Buffer drained but audio was recent — pad silence to avoid OGG gap
|
||||||
|
frame = SILENCE_FRAME;
|
||||||
|
} else if (!playerPaused && msSinceAudio >= SILENCE_TAIL_MS) {
|
||||||
|
// No audio for a while — pause Discord indicator
|
||||||
|
discordPlayer.pause();
|
||||||
|
playerPaused = true;
|
||||||
|
console.log("[transmit] Stopped — Discord indicator OFF");
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
return; // already paused, nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write one frame. If encoder is backpressured, skip this tick to avoid stalling.
|
||||||
|
const ok = opusEncoder.write(frame);
|
||||||
|
if (!ok) {
|
||||||
|
opusEncoder.once('drain', () => {}); // re-arm drain without blocking
|
||||||
}
|
}
|
||||||
}, 20);
|
}, 20);
|
||||||
|
|
||||||
@@ -105,15 +163,12 @@ export function startWebserver(port: number = 3000) {
|
|||||||
if (!Buffer.isBuffer(data)) return;
|
if (!Buffer.isBuffer(data)) return;
|
||||||
lastBrowserAudioTime = Date.now();
|
lastBrowserAudioTime = Date.now();
|
||||||
|
|
||||||
// Upsample browser 24kHz mono → 48kHz stereo
|
// Upsample 24kHz mono → 48kHz stereo and add to buffer
|
||||||
const upsampled = upsample24kMonoTo48kStereo(data);
|
const upsampled = upsample(data);
|
||||||
pcmBuffer = Buffer.concat([pcmBuffer, upsampled]);
|
|
||||||
|
|
||||||
// Encode complete Opus frames
|
// Cap buffer to avoid runaway growth during stall
|
||||||
while (pcmBuffer.length >= BYTES_PER_FRAME) {
|
if (pcmBuffer.length < MAX_BUF_BYTES) {
|
||||||
const frame = pcmBuffer.slice(0, BYTES_PER_FRAME);
|
pcmBuffer = Buffer.concat([pcmBuffer, upsampled]);
|
||||||
pcmBuffer = pcmBuffer.slice(BYTES_PER_FRAME);
|
|
||||||
opusEncoder.write(frame);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user