feat: Voice Gateway v8

This commit is contained in:
Elysia
2025-03-02 18:28:47 +07:00
parent 33b507fc6f
commit 756ec458bc
23 changed files with 740 additions and 334 deletions

View File

@@ -75,9 +75,9 @@ class VoiceConnection extends EventEmitter {
/**
* Our current video state
* @type {boolean}
* @type {boolean | null}
*/
this.videoStatus = false;
this.videoStatus = null;
/**
* The authentication data needed to connect to the voice server
@@ -98,7 +98,7 @@ class VoiceConnection extends EventEmitter {
* @event VoiceConnection#debug
* @param {string} message The debug message
*/
this.emit('debug', `audio player - ${m}`);
this.emit('debug', `media player - ${m}`);
});
this.player.on('error', e => {
@@ -233,38 +233,54 @@ class VoiceConnection extends EventEmitter {
* @param {boolean} value Video on or off
*/
setVideoStatus(value) {
if (this.status !== VoiceStatus.CONNECTED) return;
if (value === this.videoStatus) return;
if (this.status !== VoiceStatus.CONNECTED) return;
this.videoStatus = value;
this.sockets.ws
.sendPacket({
op: VoiceOpcodes.SOURCES,
d: {
audio_ssrc: this.authentication.ssrc,
video_ssrc: value ? this.authentication.ssrc + 1 : 0,
rtx_ssrc: value ? this.authentication.ssrc + 2 : 0,
streams: [
{
type: 'video',
rid: '100',
ssrc: value ? this.authentication.ssrc + 1 : 0,
active: true,
quality: 100,
rtx_ssrc: value ? this.authentication.ssrc + 2 : 0,
max_bitrate: 8000000,
max_framerate: 60,
max_resolution: {
type: 'source',
width: 0,
height: 0,
if (!value) {
this.sockets.ws
.sendPacket({
op: VoiceOpcodes.SOURCES,
d: {
audio_ssrc: this.authentication.ssrc,
video_ssrc: 0,
rtx_ssrc: 0,
streams: [],
},
})
.catch(e => {
this.emit('debug', e);
});
} else {
this.sockets.ws
.sendPacket({
op: VoiceOpcodes.SOURCES,
d: {
audio_ssrc: this.authentication.ssrc,
video_ssrc: this.authentication.ssrc + 1,
rtx_ssrc: this.authentication.ssrc + 2,
streams: [
{
type: 'video',
rid: '100',
ssrc: this.authentication.ssrc + 1,
active: true,
quality: 100,
rtx_ssrc: this.authentication.ssrc + 2,
max_bitrate: 8000000,
max_framerate: 60,
max_resolution: {
type: 'source',
width: 0,
height: 0,
},
},
},
],
},
})
.catch(e => {
this.emit('debug', e);
});
],
},
})
.catch(e => {
this.emit('debug', e);
});
}
}
/**
@@ -926,9 +942,9 @@ class StreamConnection extends VoiceConnection {
/**
* Stream state
* @type {boolean}
* @type {boolean | null}
*/
this.isPaused = false;
this.isPaused = null;
/**
* Viewer IDs

View File

@@ -28,7 +28,7 @@ class AnnexBDispatcher extends VideoDispatcher {
this._nalFunctions = nalFunctions;
}
codecCallback(frame) {
_codecCallback(frame) {
let accessUnit = frame;
let offset = 0;
@@ -43,7 +43,7 @@ class AnnexBDispatcher extends VideoDispatcher {
this._playChunk(Buffer.concat([this.createPayloadExtension(), nalu]), isLastNal);
} else {
const [naluHeader, naluData] = this._nalFunctions.splitHeader(nalu);
const dataFragments = this.partitionVideoData(naluData);
const dataFragments = this.partitionMtu(naluData);
// Send as Fragmentation Unit A (FU-A):
for (let fragmentIndex = 0; fragmentIndex < dataFragments.length; fragmentIndex++) {
const data = dataFragments[fragmentIndex];

View File

@@ -5,6 +5,8 @@ const Util = require('../../../util/Util');
const Silence = require('../util/Silence');
const VolumeInterface = require('../util/VolumeInterface');
const CHANNELS = 2;
/**
* @external WritableStream
* @see {@link https://nodejs.org/api/stream.html#stream_class_stream_writable}
@@ -37,6 +39,22 @@ class AudioDispatcher extends BaseDispatcher {
if (typeof plp !== 'undefined') this.setPLP(plp);
}
get TIMESTAMP_INC() {
return 480 * CHANNELS;
}
get FRAME_LENGTH() {
return 20;
}
/**
* Get the type of the dispatcher
* @returns {'audio'}
*/
getTypeDispatcher() {
return 'audio';
}
/**
* Set the bitrate of the current Opus encoder if using a compatible Opus stream.
* @param {number} value New bitrate, in kbps
@@ -103,6 +121,17 @@ class AudioDispatcher extends BaseDispatcher {
return true;
}
/**
* Sync with another video dispatcher to ensure that the audio and video are played at the same time.
* @param {VideoDispatcher} otherDispatcher The video dispatcher to sync with
*/
setSyncVideoDispatcher(otherDispatcher) {
if (otherDispatcher.getTypeDispatcher() !== 'video') {
throw new Error('Dispatcher must be a video dispatcher');
}
this._syncDispatcher = otherDispatcher;
}
// Volume stubs for docs
/* eslint-disable no-empty-function*/
get volumeDecibels() {}

View File

@@ -6,12 +6,10 @@ const { Writable } = require('node:stream');
const { setTimeout } = require('node:timers');
const secretbox = require('../util/Secretbox');
const CHANNELS = 2;
const MAX_UINT_16 = 2 ** 16 - 1;
const MAX_UINT_32 = 2 ** 32 - 1;
const extensions = [{ id: 5, len: 2, val: 0 }];
const extensions = [{ id: 5, length: 2, value: 0 }];
/**
* @external WritableStream
@@ -28,7 +26,7 @@ class BaseDispatcher extends Writable {
});
this.streams = streams;
/**
* The Audio Player that controls this dispatcher
* The Player that controls this dispatcher
* @type {MediaPlayer}
*/
this.player = player;
@@ -52,14 +50,6 @@ class BaseDispatcher extends Writable {
this.sequence = 0;
this.timestamp = 0;
/**
* Video FPS
* @type {number}
*/
this.fps = 0;
this.mtu = 1200;
const streamError = (type, err) => {
/**
* Emitted when the dispatcher encounters an error.
@@ -86,6 +76,10 @@ class BaseDispatcher extends Writable {
});
}
getTypeDispatcher() {
return 'base';
}
resetNonceBuffer() {
this._nonceBuffer =
this.player.voiceConnection.authentication.mode === 'aead_aes256_gcm_rtpsize'
@@ -93,25 +87,6 @@ class BaseDispatcher extends Writable {
: Buffer.alloc(24);
}
get TIMESTAMP_INC() {
return this.extensionEnabled ? 90000 / this.fps : 480 * CHANNELS;
}
get FRAME_LENGTH() {
return this.extensionEnabled ? 1000 / this.fps : 20;
}
partitionVideoData(data) {
const out = [];
const dataLength = data.length;
for (let i = 0; i < dataLength; i += this.mtu) {
out.push(data.slice(i, i + this.mtu));
}
return out;
}
getNewSequence() {
const currentSeq = this.sequence;
this.sequence++;
@@ -128,8 +103,20 @@ class BaseDispatcher extends Writable {
this.emit('start');
this.startTime = performance.now();
}
if (this.extensionEnabled) {
this.codecCallback(chunk);
if (this._syncDispatcher && !this._syncDispatcher.startTime) {
this.pause();
const cb = () => {
this.resume();
clearTimeout(timeout);
};
this._syncDispatcher.once('start', cb);
let timeout = setTimeout(() => {
this.removeListener('start', cb);
this.resume();
}, 10_000).unref();
}
if (this.getTypeDispatcher() === 'video') {
this._codecCallback(chunk);
} else {
this._playChunk(chunk);
}
@@ -166,8 +153,7 @@ class BaseDispatcher extends Writable {
this.streams.ffmpeg.pause();
this.streams.video.unpipe(this);
}
if (!this.extensionEnabled) {
// Audio
if (this.getTypeDispatcher() === 'audio') {
if (silence) {
this.streams.silence.pipe(this);
this._silence = true;
@@ -201,7 +187,7 @@ class BaseDispatcher extends Writable {
*/
resume() {
if (!this.pausedSince) return;
if (!this.extensionEnabled) this.streams.silence.unpipe(this);
if (this.getTypeDispatcher() === 'audio') this.streams.silence.unpipe(this);
if (this.streams.opus) this.streams.opus.pipe(this);
if (this.streams.video) {
this.streams.ffmpeg.resume();
@@ -246,14 +232,15 @@ class BaseDispatcher extends Writable {
callback();
}
_playChunk(chunk, isLastPacket) {
_playChunk(chunk, isLastPacket = false) {
if (
(this.player.dispatcher !== this && this.player.videoDispatcher !== this) ||
!this.player.voiceConnection.authentication.secret_key
) {
return;
}
this[this.extensionEnabled ? '_sendVideoPacket' : '_sendPacket'](this._createPacket(chunk, isLastPacket));
const packet = this._createPacket(chunk, isLastPacket);
if (packet) this._sendPacket(packet);
}
/**
@@ -263,11 +250,11 @@ class BaseDispatcher extends Writable {
*/
createHeaderExtension() {
/**
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| defined by profile | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| defined by profile | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
const profile = Buffer.alloc(4);
profile[0] = 0xbe;
profile[1] = 0xde;
@@ -277,8 +264,9 @@ class BaseDispatcher extends Writable {
}
/**
* Creates a single extension of type playout-delay
* Discord seems to send this extension on every video packet
* Creates a one-byte extension header & a single extension of type playout-delay
* @see https://docs.discord.sex/topics/voice-connections#sending-and-receiving-voice
* Discord expects a playout delay RTP extension header on every video packet.
* @see https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay
* @returns {Buffer} playout-delay extension <Buffer 51 00 00 00>
*/
@@ -289,23 +277,28 @@ class BaseDispatcher extends Writable {
* EXTENSION DATA - each extension payload is 32 bits
*/
const data = Buffer.alloc(4);
/**
* 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
| ID | len |
+-+-+-+-+-+-+-+-+
where len = actual length - 1
*/
data[0] = (ext.id & 0b00001111) << 4;
data[0] |= (ext.len - 1) & 0b00001111;
/** Specific to type playout-delay
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| MIN delay | MAX delay |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
data.writeUIntBE(ext.val, 1, 2); // Not quite but its 0 anyway
// https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay
if (ext.id === 5) {
/**
* 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
| ID | len |
+-+-+-+-+-+-+-+-+
where len = actual length - 1
/
data[0] = (ext.id & 0b00001111) << 4;
data[0] |= (ext.len - 1) & 0b00001111;
/** Specific to type playout-delay
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| MIN delay | MAX delay |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
data.writeUIntBE(ext.value, 1, 2); // Not quite but its 0 anyway
}
extensionsData.push(data);
}
return Buffer.concat(extensionsData);
@@ -352,24 +345,50 @@ class BaseDispatcher extends Writable {
}
}
_createPacket(buffer, isLastPacket = false) {
_createPacket(buffer, isLastPacket) {
/*
// Packet is raw rtp from ffmpeg
const rtp = webrtc.RtpPacket.deSerialize(buffer);
if (!rtp.payload) {
console.log('no payload', rtp);
return null;
}
// Header
const rtpHeader = this.extensionEnabled
? Buffer.concat([Buffer.alloc(12), this.createHeaderExtension()])
: Buffer.alloc(12); // RTP_HEADER_SIZE
rtpHeader[0] = this.extensionEnabled ? 0x90 : 0x80; // Version + Flags (1 byte)
// https://docs.discord.sex/topics/voice-connections#rtp-packet-structure
let rtpHeader = buffer.slice(0, 12); // RTP_HEADER_SIZE
rtpHeader[0] = 0x80; // Version + Flags (1 byte)
rtpHeader[1] = this.payloadType; // Payload Type (1 byte)
if (this.extensionEnabled) {
if (isLastPacket) {
rtpHeader[1] |= 0x80;
}
rtpHeader = Buffer.concat([rtpHeader, this.createHeaderExtension()]);
rtpHeader[0] |= 1 << 4; // 0x90
}
rtpHeader.writeUIntBE(this.getNewSequence(), 2, 2);
rtpHeader.writeUIntBE(this.timestamp, 4, 4);
rtpHeader.writeUIntBE(
this.player.voiceConnection.authentication.ssrc + Number(this.getTypeDispatcher() === 'video'),
8,
4,
);
*/
// Header
let rtpHeader = Buffer.alloc(12); // RTP_HEADER_SIZE
rtpHeader[0] = 0x80; // Version + Flags (1 byte)
rtpHeader[1] = this.payloadType; // Payload Type (1 byte)
if (this.extensionEnabled) {
rtpHeader = Buffer.concat([rtpHeader, this.createHeaderExtension()]);
rtpHeader[0] |= 1 << 4; // 0x90
}
if (this.getTypeDispatcher() === 'video' && isLastPacket) {
rtpHeader[1] |= 1 << 7; // Marker bit
}
rtpHeader.writeUIntBE(this.getNewSequence(), 2, 2);
rtpHeader.writeUIntBE(this.timestamp, 4, 4);
rtpHeader.writeUIntBE(this.player.voiceConnection.authentication.ssrc + this.extensionEnabled, 8, 4);
rtpHeader.writeUIntBE(
this.player.voiceConnection.authentication.ssrc + Number(this.getTypeDispatcher() === 'video'),
8,
4,
);
return Buffer.concat([rtpHeader, ...this._encrypt(buffer, rtpHeader)]);
}
@@ -379,31 +398,27 @@ class BaseDispatcher extends Writable {
* @event BaseDispatcher#debug
* @param {string} info The debug info
*/
this._setSpeaking(this.player.isScreenSharing ? 1 << 1 : 1 << 0); // 1 << 0 = SPEAKING, 1 << 1 = SOUND SHARE
if (this.getTypeDispatcher() === 'audio') {
this._setSpeaking(this.player.isScreenSharing ? 1 << 1 : 1 << 0); // 1 << 0 = SPEAKING, 1 << 1 = SOUND SHARE
} else if (this.getTypeDispatcher() === 'video') {
this._setVideoStatus(true);
this._setStreamStatus(false);
}
if (!this.player.voiceConnection.sockets.udp) {
this.emit('debug', 'Failed to send a packet - no UDP socket');
return;
}
this.player.voiceConnection.sockets.udp.send(packet).catch(e => {
this._setSpeaking(0);
if (this.getTypeDispatcher() === 'audio') {
this._setSpeaking(this._setSpeaking(0));
} else if (this.getTypeDispatcher() === 'video') {
this._setVideoStatus(false);
this._setStreamStatus(true);
}
this.emit('debug', `Failed to send a packet - ${e}`);
});
}
_sendVideoPacket(packet) {
this._setVideoStatus(true);
this._setStreamStatus(false);
if (!this.player.voiceConnection.sockets.udp) {
this.emit('debug', 'Failed to send a video packet - no UDP socket');
return;
}
this.player.voiceConnection.sockets.udp.send(packet).catch(e => {
this._setVideoStatus(false);
this._setStreamStatus(true);
this.emit('debug', `Failed to send a video packet - ${e}`);
});
}
_setSpeaking(value) {
if (typeof this.player.voiceConnection !== 'undefined') {
this.player.voiceConnection.setSpeaking(value);

View File

@@ -26,25 +26,25 @@ class VP8Dispatcher extends VideoDispatcher {
super(player, highWaterMark, streams, fps, Util.getPayloadType('VP8'));
}
makeChunk(buffer, isFirstFrame) {
makeChunk(buffer, isFirstPacket) {
// Vp8 payload descriptor
const payloadDescriptorBuf = Buffer.alloc(2);
payloadDescriptorBuf[0] = isFirstFrame ? 0x90 : 0x80; // Mark S bit, indicates start of frame: payloadDescriptorBuf[0] |= 0b00010000;
payloadDescriptorBuf[0] = 0x80;
payloadDescriptorBuf[1] = 0x80;
if (isFirstPacket) {
payloadDescriptorBuf[0] |= 1 << 4; // Mark S bit, indicates start of frame
}
// Vp8 pictureid payload extension
const pictureIdBuf = Buffer.alloc(2);
pictureIdBuf.writeUintBE(this.count, 0, 2);
pictureIdBuf[0] |= 0x80;
return Buffer.concat([payloadDescriptorBuf, pictureIdBuf, buffer]);
return Buffer.concat([this.createPayloadExtension(), payloadDescriptorBuf, pictureIdBuf, buffer]);
}
codecCallback(chunk) {
const chunkSplit = this.partitionVideoData(chunk);
_codecCallback(chunk) {
const chunkSplit = this.partitionMtu(chunk).map((c, i) => this.makeChunk(c, i === 0));
for (let i = 0; i < chunkSplit.length; i++) {
this._playChunk(
Buffer.concat([this.createPayloadExtension(), this.makeChunk(chunkSplit[i], i == 0)]),
i + 1 === chunkSplit.length,
);
this._playChunk(chunkSplit[i], i + 1 === chunkSplit.length);
}
}
}

View File

@@ -16,7 +16,40 @@ const BaseDispatcher = require('./BaseDispatcher');
class VideoDispatcher extends BaseDispatcher {
constructor(player, highWaterMark = 12, streams, fps, payloadType) {
super(player, highWaterMark, payloadType, true, streams);
/**
* Video FPS
* @type {number}
*/
this.fps = fps;
this.mtu = 1200;
}
get TIMESTAMP_INC() {
return 90000 / this.fps;
}
get FRAME_LENGTH() {
return 1000 / this.fps;
}
/**
* Get the type of the dispatcher
* @returns {'video'}
*/
getTypeDispatcher() {
return 'video';
}
partitionMtu(data) {
const out = [];
const dataLength = data.length;
for (let i = 0; i < dataLength; i += this.mtu) {
out.push(data.slice(i, i + this.mtu));
}
return out;
}
/**
@@ -26,6 +59,10 @@ class VideoDispatcher extends BaseDispatcher {
setFPSSource(value) {
this.fps = value;
}
_codecCallback() {
throw new Error('The _codecCallback method must be implemented');
}
}
module.exports = VideoDispatcher;

View File

@@ -26,6 +26,8 @@ class VoiceWebSocket extends EventEmitter {
*/
this.attempts = 0;
this._sequenceNumber = -1;
this.dead = false;
this.connection.on('closing', this.shutdown.bind(this));
}
@@ -75,7 +77,7 @@ class VoiceWebSocket extends EventEmitter {
* The actual WebSocket used to connect to the Voice WebSocket Server.
* @type {WebSocket}
*/
this.ws = WebSocket.create(`wss://${this.connection.authentication.endpoint}/`, { v: 7 });
this.ws = WebSocket.create(`wss://${this.connection.authentication.endpoint}/`, { v: 8 });
this.emit('debug', `[WS] connecting, ${this.attempts} attempts, ${this.ws.url}`);
this.ws.onopen = this.onOpen.bind(this);
this.ws.onmessage = this.onMessage.bind(this);
@@ -144,9 +146,10 @@ class VoiceWebSocket extends EventEmitter {
/**
* Called whenever the connection to the WebSocket server is lost.
* @param {CloseEvent} event The WebSocket close event
*/
onClose() {
this.emit('debug', `[WS] closed`);
onClose(event) {
this.emit('debug', `[WS] closed with code ${event.code} and reason: ${event.reason}`);
if (!this.dead) setTimeout(this.connect.bind(this), this.attempts * 1000).unref();
}
@@ -165,6 +168,7 @@ class VoiceWebSocket extends EventEmitter {
*/
onPacket(packet) {
this.emit('debug', `[WS] << ${JSON.stringify(packet)}`);
if (packet.seq) this._sequenceNumber = packet.seq;
switch (packet.op) {
case VoiceOpcodes.HELLO:
this.setHeartbeat(packet.d.heartbeat_interval);
@@ -266,7 +270,13 @@ class VoiceWebSocket extends EventEmitter {
* Sends a heartbeat packet.
*/
sendHeartbeat() {
this.sendPacket({ op: VoiceOpcodes.HEARTBEAT, d: Math.floor(Math.random() * 10e10) }).catch(() => {
this.sendPacket({
op: VoiceOpcodes.HEARTBEAT,
d: {
t: Date.now(),
seq_ack: this._sequenceNumber,
},
}).catch(() => {
this.emit('warn', 'Tried to send heartbeat, but connection is not open');
this.clearHeartbeat();
});

View File

@@ -20,7 +20,7 @@ Please use the @dank074/discord-video-stream library for the best support.
const EventEmitter = require('events');
const { Readable: ReadableStream } = require('stream');
const prism = require('prism-media');
const { H264NalSplitter } = require('./processing/AnnexBNalSplitter');
const { H264NalSplitter, H265NalSplitter } = require('./processing/AnnexBNalSplitter');
const { IvfTransformer } = require('./processing/IvfSplitter');
const { H264Dispatcher } = require('../dispatcher/AnnexBDispatcher');
const AudioDispatcher = require('../dispatcher/AudioDispatcher');
@@ -62,6 +62,19 @@ const FFMPEG_H264_ARGUMENTS = options => [
'h264_metadata=aud=insert',
];
const FFMPEG_H265_ARGUMENTS = options => [
'-c:v',
'libx265',
'-f',
'hevc',
'-preset',
options?.presetH265 || 'faster',
'-profile:v',
'main',
'-bf',
'0',
];
/**
* Player for a Voice Connection.
* @private
@@ -188,15 +201,19 @@ class MediaPlayer extends EventEmitter {
}
// Get stream type
if (this.voiceConnection.videoCodec == 'VP8') {
if (this.voiceConnection.videoCodec === 'VP8') {
args.push(...FFMPEG_VP8_ARGUMENTS);
// Remove '-speed', '5' bc bad quality
}
if (this.voiceConnection.videoCodec == 'H264') {
if (this.voiceConnection.videoCodec === 'H264') {
args.push(...FFMPEG_H264_ARGUMENTS(options));
}
if (this.voiceConnection.videoCodec === 'H265') {
args.push(...FFMPEG_H265_ARGUMENTS(options));
}
args.push('-force_key_frames', '00:02');
if (options?.inputFFmpegArgs) {
@@ -229,7 +246,7 @@ class MediaPlayer extends EventEmitter {
return this.playAnnexBVideo(ffmpeg, options, streams, 'H264');
}
default: {
throw new Error('Invalid codec (Supported: VP8, H264)');
throw new Error('Invalid codec (Supported: VP8, H264, H265)');
}
}
}
@@ -247,7 +264,12 @@ class MediaPlayer extends EventEmitter {
// eslint-disable-next-line no-unused-vars
playAnnexBVideo(stream, options, streams, type) {
this.destroyVideoDispatcher();
const videoStream = new H264NalSplitter();
let videoStream;
if (type === 'H264') {
videoStream = new H264NalSplitter();
} else if (type === 'H265') {
videoStream = new H265NalSplitter();
}
stream.pipe(videoStream);
streams.video = videoStream;
const dispatcher = this.createVideoDispatcher(options, streams);

View File

@@ -0,0 +1,37 @@
'use strict';
const { Buffer } = require('node:buffer');
const { Transform } = require('node:stream');
class PCMInsertSilence extends Transform {
constructor(options) {
super(options);
// 48Khz, 2 channels, 16-bit (2 bytes per channel)
this.sampleRate = 48000;
this.channels = 2;
// 4 bytes per frame (2 channels * 2 bytes)
this.bytesPerFrame = this.channels * 2;
this.lastChunkTime = Date.now();
this.silenceThresholdMs = 50;
}
_transform(chunk, encoding, callback) {
const now = Date.now();
const gap = now - this.lastChunkTime;
if (gap >= this.silenceThresholdMs) {
const missingFrames = Math.floor((gap / 1000) * this.sampleRate);
const silenceBuffer = Buffer.alloc(missingFrames * this.bytesPerFrame, 0);
this.push(silenceBuffer);
}
this.lastChunkTime = now;
this.push(chunk);
callback();
}
}
module.exports = {
PCMInsertSilence,
};

View File

@@ -3,10 +3,11 @@
const EventEmitter = require('events');
const { Buffer } = require('node:buffer');
const crypto = require('node:crypto');
const { nextTick } = require('node:process');
const { setTimeout } = require('node:timers');
const FFmpegHandler = require('./FFmpegHandler');
const { RtpPacket } = require('werift-rtp');
const Recorder = require('./Recorder');
const Speaking = require('../../../util/Speaking');
const Util = require('../../../util/Util');
const secretbox = require('../util/Secretbox');
const { SILENCE_FRAME } = require('../util/Silence');
@@ -14,7 +15,7 @@ const { SILENCE_FRAME } = require('../util/Silence');
// https://github.com/discordjs/discord.js/issues/3524#issuecomment-540373200
const DISCORD_SPEAKING_DELAY = 250;
const HEADER_EXTENSION_BYTE = Buffer.from([0xbe, 0xde]);
// Unused: const HEADER_EXTENSION_BYTE = Buffer.from([0xbe, 0xde]);
const UNPADDED_NONCE_LENGTH = 4;
const AUTH_TAG_LENGTH = 16;
@@ -57,16 +58,21 @@ class PacketHandler extends EventEmitter {
return stream;
}
makeVideoStream(user, portUdp, codec, output, isEnableAudio = false) {
makeVideoStream(user, output) {
if (this.videoStreams.has(user)) return this.videoStreams.get(user);
const stream = new FFmpegHandler(this, user, codec, portUdp, output, isEnableAudio);
const stream = new Recorder(this, {
userId: user,
output,
portUdpH264: 65506,
portUdpOpus: 65510,
});
stream.on('ready', () => {
this.videoStreams.set(user, stream);
});
return stream;
}
parseBuffer(buffer, shouldReturnTuple = false) {
parseBuffer(buffer) {
const { secret_key, mode } = this.receiver.connection.authentication;
// Open packet
if (!secret_key) return new Error('secret_key cannot be null or undefined');
@@ -115,30 +121,21 @@ class PacketHandler extends EventEmitter {
}
}
if (shouldReturnTuple) {
return [header, packet];
}
/*
// Strip decrypted RTP Header Extension if present
if (buffer.slice(12, 14).compare(HEADER_EXTENSION_BYTE) === 0) {
const headerExtensionLength = buffer.slice(14).readUInt16BE();
packet = packet.subarray(4 * headerExtensionLength);
}
*/
return packet;
return RtpPacket.deSerialize(Buffer.concat([header, packet]));
}
audioReceiver(buffer) {
const ssrc = buffer.readUInt32BE(8);
const userStat = this.connection.ssrcMap.get(ssrc);
if (!userStat) return;
let opusPacket;
audioReceiver(ssrc, userStat, opusPacket) {
const streamInfo = this.streams.get(userStat.userId);
// If the user is in video, we need to check if the packet is just silence
if (userStat.hasVideo) {
opusPacket = this.parseBuffer(buffer);
if (opusPacket instanceof Error) {
// Only emit an error if we were actively receiving packets from this user
if (streamInfo) {
@@ -146,7 +143,14 @@ class PacketHandler extends EventEmitter {
}
return;
}
if (SILENCE_FRAME.equals(opusPacket)) {
// Check payload type
if (opusPacket.header.payloadType !== Util.getPayloadType('opus')) {
return;
}
if (!opusPacket.payload) {
return;
}
if (SILENCE_FRAME.equals(opusPacket.payload)) {
// If this is a silence frame, pretend we never received it
return;
}
@@ -176,65 +180,68 @@ class PacketHandler extends EventEmitter {
if (streamInfo) {
const { stream } = streamInfo;
if (!opusPacket) {
opusPacket = this.parseBuffer(buffer);
if (opusPacket instanceof Error) {
this.emit('error', opusPacket);
return;
}
if (opusPacket instanceof Error) {
this.emit('error', opusPacket);
return;
}
if (opusPacket === null) {
// ! null marks EOF for stream
nextTick(() => this.destroy());
if (opusPacket.header.payloadType !== Util.getPayloadType('opus')) {
return;
}
stream.push(opusPacket);
stream.push(opusPacket.payload);
}
}
audioReceiverForStream(buffer) {
const ssrc = buffer.readUInt32BE(8);
const userStat = this.connection.ssrcMap.get(ssrc); // Audio_ssrc
if (!userStat) return;
audioReceiverForStream(ssrc, userStat, packet) {
const streamInfo = this.videoStreams.get(userStat.userId);
if (!streamInfo) return;
const packet = this.parseBuffer(buffer, true);
if (packet instanceof Error) {
return;
}
if (streamInfo.isEnableAudio) {
streamInfo.sendPayloadToFFmpeg(Buffer.concat(packet), true);
if (packet.header.payloadType !== Util.getPayloadType('opus')) {
return;
}
streamInfo.feed(packet);
}
videoReceiver(buffer) {
const ssrc = buffer.readUInt32BE(8);
const userStat = this.connection.ssrcMap.get(ssrc - 1); // Video_ssrc
if (!userStat) return;
/**
* Test
* @param {number} ssrc ssrc
* @param {Object} userStat { userId, hasVideo }
* @param {RtpPacket} packet RtpPacket
* @returns {void}
*/
videoReceiver(ssrc, userStat, packet) {
const streamInfo = this.videoStreams.get(userStat.userId);
// If the user is in video, we need to check if the packet is just silence
if (userStat.hasVideo) {
const packet = this.parseBuffer(buffer, true);
if (packet instanceof Error) {
return;
}
let [header, videoPacket] = packet;
if (SILENCE_FRAME.equals(videoPacket)) {
// If this is a silence frame, pretend we never received it
if (packet.header.payloadType === Util.getPayloadType('opus')) {
return;
}
this.receiver.emit('videoData', ssrc - 1, userStat, header, videoPacket);
if (streamInfo) {
streamInfo.sendPayloadToFFmpeg(Buffer.concat(packet));
streamInfo.feed(packet);
}
}
}
push(buffer) {
this.audioReceiver(buffer);
this.videoReceiver(buffer);
this.audioReceiverForStream(buffer);
const ssrc = buffer.readUInt32BE(8);
let userStat, packet;
if (this.connection.ssrcMap.has(ssrc)) {
userStat = this.connection.ssrcMap.get(ssrc); // Audio_ssrc
packet = this.parseBuffer(buffer);
this.audioReceiver(ssrc, userStat, packet);
this.audioReceiverForStream(ssrc, userStat, packet);
} else if (this.connection.ssrcMap.has(ssrc - 1)) {
userStat = this.connection.ssrcMap.get(ssrc - 1); // Video_ssrc
packet = this.parseBuffer(buffer);
this.videoReceiver(ssrc, userStat, packet);
}
if (userStat && !(packet instanceof Error)) this.receiver.emit('receiverData', userStat, packet);
}
// When udp connection is closed (STREAM_DELETE), destroy all streams (Memory leak)

View File

@@ -4,6 +4,7 @@ const EventEmitter = require('events');
const prism = require('prism-media');
const PacketHandler = require('./PacketHandler');
const { Error } = require('../../../errors');
const { PCMInsertSilence } = require('../player/processing/PCMInsertSilence');
/**
* Receives audio packets from a voice connection.
@@ -33,6 +34,8 @@ class VoiceReceiver extends EventEmitter {
* audio
* @property {string} [end='silence'] When the stream should be destroyed. If `silence`, this will be when the user
* stops talking. Otherwise, if `manual`, this should be handled by you.
* @property {boolean} [paddingSilence=false] Whether to add silence padding
* If 'end' is set to 'silence', this property automatically defaults to `false`
*/
/**
@@ -42,49 +45,51 @@ class VoiceReceiver extends EventEmitter {
* @param {ReceiveStreamOptions} options Options.
* @returns {ReadableStream}
*/
createStream(user, { mode = 'opus', end = 'silence' } = {}) {
createStream(user, { mode = 'opus', end = 'silence', paddingSilence = false } = {}) {
user = this.connection.client.users.resolve(user);
if (end === 'silence') paddingSilence = false;
if (!user) throw new Error('VOICE_USER_MISSING');
const stream = this.packets.makeStream(user.id, end);
if (mode === 'pcm') {
const stream = this.packets.makeStream(user.id, end); // Opus stream
if (paddingSilence) {
const decoder = new prism.opus.Decoder({ channels: 2, rate: 48000, frameSize: 960 });
stream.pipe(decoder);
return decoder;
const pcmTransformer = new PCMInsertSilence();
stream.pipe(decoder).pipe(pcmTransformer);
if (mode === 'opus') {
const encoder = new prism.opus.Encoder({ channels: 2, rate: 48000, frameSize: 960 });
pcmTransformer.pipe(encoder);
return encoder;
}
return pcmTransformer;
} else {
if (mode === 'pcm') {
const decoder = new prism.opus.Decoder({ channels: 2, rate: 48000, frameSize: 960 });
stream.pipe(decoder);
return decoder;
}
return stream;
}
return stream;
}
/**
* Options passed to `VoiceReceiver#createVideoStream`.
* @typedef {Object} ReceiveVideoStreamOptions
* @property {number} portUdp The UDP port to use for the video stream (local stream).
* @property {WritableStream|string} output Output stream or file path to write the video stream to.
* @property {boolean} [isEnableAudio=false] Enable audio for the video stream.
* <info>If you intend to record the stream with audio, make sure that `portUdp` and `portUdp + 2` are not in use.</info>
*/
/**
* Creates a new video receiving stream. If a stream already exists for a user, then that stream will be returned
* rather than generating a new one.
* <info>Proof of concept - Requires a very good internet connection</info>
* @param {UserResolvable} user The user to start listening to.
* @param {ReceiveVideoStreamOptions} options Options.
* @returns {FFmpegHandler} The video stream for the specified user.
* @param {WritableStream|string} output Output stream or file path to write the video stream to.
* @returns {Recorder} The video stream for the specified user.
*/
createVideoStream(user, { portUdp, output, isEnableAudio = false } = {}) {
createVideoStream(user, output) {
user = this.connection.client.users.resolve(user);
if (!user) throw new Error('VOICE_USER_MISSING');
const stream = this.packets.makeVideoStream(user.id, portUdp, 'H264', output, isEnableAudio);
const stream = this.packets.makeVideoStream(user.id, output);
return stream;
}
/**
* Emitted whenever there is a video data (Raw)
* @event VoiceReceiver#videoData
* @param {number} ssrc SSRC
* Emitted whenever there is a data packet received.
* @event VoiceReceiver#receiverData
* @param {{ userId: Snowflake, hasVideo: boolean }} ssrcData SSRC Data
* @param {Buffer} header The unencrypted RTP header contains 12 bytes, Buffer<0xbe, 0xde> and the extension size
* @param {Buffer} packetDecrypt Decrypted contains the extension, if any, the video packet
* @param {RtpPacket} header RTP Packet
*/
}

View File

@@ -7,15 +7,17 @@ const { Buffer } = require('node:buffer');
const { Writable } = require('stream');
const find = require('find-process');
const kill = require('tree-kill');
const { RtpPacket } = require('werift-rtp');
const Util = require('../../../util/Util');
const { randomPorts } = require('../util/Function');
const { StreamOutput } = require('../util/Socket');
/**
* Represents a FFmpeg handler
* @extends {EventEmitter}
*/
class FFmpegHandler extends EventEmitter {
constructor(receiver, userId, codec, portUdp, output, isEnableAudio) {
class Recorder extends EventEmitter {
constructor(receiver, { userId, portUdpH264, portUdpOpus, output } = {}) {
super();
Object.defineProperty(this, 'receiver', { value: receiver });
@@ -26,27 +28,18 @@ class FFmpegHandler extends EventEmitter {
*/
this.userId = userId;
/**
* If the audio is enabled
* @type {boolean}
*/
this.isEnableAudio = isEnableAudio;
this.portUdpH264 = portUdpH264;
this.portUdpH265 = null;
this.portUdpOpus = portUdpOpus;
/**
* The codec of the stream
* @type {VideoCodec}
*/
this.codec = codec;
this.promise = null;
/**
* The UDP port to listen to
* @type {number}
*/
this.portUdp = portUdp;
const isStream = output instanceof Writable;
if (isStream) {
this.outputStream = StreamOutput(output);
if (!portUdpH264 || !portUdpOpus) {
this.promise = randomPorts(6, 'udp4').then(ports => {
ports = ports.filter(port => port % 2 === 0);
this.portUdpH264 ??= ports[0];
this.portUdpOpus ??= ports[1];
});
}
/**
@@ -55,35 +48,54 @@ class FFmpegHandler extends EventEmitter {
*/
this.output = output;
const sdpData = Util.getSDPCodecName(portUdp, this.isEnableAudio);
/**
* The FFmpeg process is ready or not
* @type {boolean}
*/
this.ready = false;
this.socket = createSocket('udp4');
this.init(output);
}
async init(output) {
await this.promise;
const sdpData = Util.getSDPCodecName(this.portUdpH264, this.portUdpH265, this.portUdpOpus);
const isStream = output instanceof Writable;
if (isStream) {
this.outputStream = StreamOutput(output);
}
const stream = spawn('ffmpeg', [
'-reorder_queue_size',
'50',
'500',
'-thread_queue_size',
'500',
'-err_detect',
'ignore_err',
'-flags2',
'+export_mvs',
'-fflags',
'+genpts',
'-fflags',
'+discardcorrupt',
'+genpts+discardcorrupt',
'-use_wallclock_as_timestamps',
'1',
'-f',
'sdp',
'-analyzeduration',
'1M',
'-probesize',
'1M',
'-protocol_whitelist',
'file,udp,rtp,pipe,fd',
'-i',
'-', // Read from stdin
'-buffer_size',
'1000000',
'4M',
'-max_delay',
'500000',
'500000', // 500ms
'-rtbufsize',
'4M',
'-c',
'copy',
'-y',
'-f',
'matroska',
@@ -102,33 +114,36 @@ class FFmpegHandler extends EventEmitter {
this.ready = true;
this.emit('ready');
});
this.socket = createSocket('udp4');
this.socketAudio = createSocket('udp4');
}
/**
* Send a payload to FFmpeg via UDP
* @param {Buffer} payload The payload
* @param {boolean} isAudio If the payload is audio
* @param {RtpPacket|string|Buffer} payload The payload
* @param {*} callback Callback
*/
sendPayloadToFFmpeg(
feed(
payload,
isAudio = false,
callback = e => {
if (e) {
console.error('Error sending packet:', e);
}
},
) {
const message = Buffer.from(payload);
if (isAudio && !this.isEnableAudio) {
if (!(payload instanceof RtpPacket)) {
payload = RtpPacket.deSerialize(Buffer.isBuffer(payload) ? payload : Buffer.from(payload));
}
const message = payload.serialize();
// Get port from payloadType
let port;
if (payload.header.payloadType === Util.getPayloadType('opus')) {
port = this.portUdpOpus;
} else if (payload.header.payloadType === Util.getPayloadType('H264')) {
port = this.portUdpH264;
} else if (payload.header.payloadType === Util.getPayloadType('H265')) {
port = this.portUdpH265;
} else {
return;
}
if (isAudio) {
this.socketAudio.send(message, 0, message.length, this.portUdp + 2, '127.0.0.1', callback);
} else {
this.socket.send(message, 0, message.length, this.portUdp, '127.0.0.1', callback);
}
this.socket.send(message, 0, message.length, port, '127.0.0.1', callback);
}
destroy() {
@@ -138,21 +153,21 @@ class FFmpegHandler extends EventEmitter {
let process = list.find(o => o.pid === ffmpegPid || o.ppid === ffmpegPid || o.cmd.includes(args));
if (process) {
kill(process.pid);
this.receiver.videoStreams.delete(this.userId);
this.receiver?.videoStreams?.delete(this.userId);
this.emit('closed');
}
});
}
/**
* Emitted when the FFmpegHandler becomes ready to start working.
* @event FFmpegHandler#ready
* Emitted when the Recorder becomes ready to start working.
* @event Recorder#ready
*/
/**
* Emitted when the FFmpegHandler is closed.
* @event FFmpegHandler#closed
* Emitted when the Recorder is closed.
* @event Recorder#closed
*/
}
module.exports = FFmpegHandler;
module.exports = Recorder;

View File

@@ -1,5 +1,103 @@
'use strict';
const dgram = require('dgram');
const { setImmediate } = require('node:timers');
/**
* @typedef {Object} InterfaceAddresses
* @property {string} [udp4] - IPv4 address
* @property {string} [udp6] - IPv6 address
*/
/**
* Get the interface address for a given socket type.
* @param {"udp4"|"udp6"} type - The socket type.
* @param {InterfaceAddresses} [interfaceAddresses] - The interface addresses mapping.
* @returns {string|undefined} The interface address if available.
*/
function interfaceAddress(type, interfaceAddresses) {
return interfaceAddresses ? interfaceAddresses[type] : undefined;
}
/**
* Get a random available port.
* @param {"udp4"|"udp6"} [protocol="udp4"] - The socket type.
* @param {InterfaceAddresses} [interfaceAddresses] - The interface addresses mapping.
* @returns {Promise<number>} The assigned random port.
*/
async function randomPort(protocol = 'udp4', interfaceAddresses) {
const socket = dgram.createSocket(protocol);
setImmediate(() =>
socket.bind({
port: 0,
address: interfaceAddress(protocol, interfaceAddresses),
}),
);
await new Promise((resolve, reject) => {
socket.once('error', reject);
socket.once('listening', resolve);
});
const port = socket.address()?.port;
await new Promise(resolve => socket.close(resolve));
return port;
}
/**
* Get multiple random available ports.
* @param {number} num - Number of ports to find.
* @param {"udp4"|"udp6"} [protocol="udp4"] - The socket type.
* @param {InterfaceAddresses} [interfaceAddresses] - The interface addresses mapping.
* @returns {Promise<number[]>} An array of assigned random ports.
*/
async function randomPorts(num, protocol = 'udp4', interfaceAddresses) {
return Promise.all(Array.from({ length: num }).map(() => randomPort(protocol, interfaceAddresses)));
}
/**
* Find an available port within a given range.
* @param {number} min - The minimum port number.
* @param {number} max - The maximum port number.
* @param {"udp4"|"udp6"} [protocol="udp4"] - The socket type.
* @param {InterfaceAddresses} [interfaceAddresses] - The interface addresses mapping.
* @returns {Promise<number>} The available port within range.
* @throws {Error} If no port is found within the range.
*/
async function findPort(min, max, protocol = 'udp4', interfaceAddresses) {
let port;
for (let i = min; i <= max; i++) {
const socket = dgram.createSocket(protocol);
setImmediate(() =>
socket.bind({
port: i,
address: interfaceAddress(protocol, interfaceAddresses),
}),
);
const error = await new Promise(resolve => {
socket.once('error', resolve);
socket.once('listening', () => resolve(null));
});
await new Promise(resolve => socket.close(resolve));
if (error) continue;
const addressInfo = socket.address();
if (addressInfo && addressInfo.port >= min && addressInfo.port <= max) {
port = addressInfo.port;
break;
}
}
if (!port) throw new Error('port not found');
return port;
}
function parseStreamKey(key) {
const Arr = key.split(':');
const type = Arr[0];
@@ -10,5 +108,9 @@ function parseStreamKey(key) {
}
module.exports = {
randomPort,
randomPorts,
findPort,
interfaceAddress,
parseStreamKey,
};