feat: try implement djs voice + video (v12)

This commit is contained in:
Elysia
2024-07-24 19:27:50 +07:00
parent 7fa4666df0
commit 26aa85c126
31 changed files with 3768 additions and 9 deletions

View File

@@ -1,9 +1,13 @@
'use strict';
const { Collection } = require('@discordjs/collection');
const VoiceConnection = require('./VoiceConnection');
const { Error } = require('../../errors');
const { Events } = require('../../util/Constants');
/**
* Manages voice connections for the client
* Feat: Support both lib & djs/voice
*/
class ClientVoiceManager {
constructor(client) {
@@ -15,6 +19,12 @@ class ClientVoiceManager {
*/
Object.defineProperty(this, 'client', { value: client });
/**
* A collection mapping connection IDs to the Connection objects
* @type {Collection<Snowflake, VoiceConnection>}
*/
this.connections = new Collection();
/**
* Maps guild ids to voice adapters created for use with @discordjs/voice.
* @type {Map<Snowflake, Object>}
@@ -32,6 +42,16 @@ class ClientVoiceManager {
}
onVoiceServer(payload) {
const { guild_id, channel_id, token, endpoint } = payload;
this.client.emit(
'debug',
`[VOICE] voiceServer ${channel_id ? 'channel' : 'guild'}: ${
channel_id || guild_id
} token: ${token} endpoint: ${endpoint}`,
);
const connection = this.connections.get(guild_id || channel_id); // DMs Call
if (connection) connection.setTokenAndEndpoint(token, endpoint);
// Djs / voice
if (payload.guild_id) {
this.adapters.get(payload.guild_id)?.onVoiceServerUpdate(payload);
} else {
@@ -40,12 +60,89 @@ class ClientVoiceManager {
}
onVoiceStateUpdate(payload) {
const { guild_id, session_id, channel_id } = payload;
const connection = this.connections.get(guild_id || channel_id); // DMs Call
this.client.emit('debug', `[VOICE] connection? ${!!connection}, ${guild_id} ${session_id} ${channel_id}`);
if (!connection) return;
if (!channel_id) {
connection._disconnect();
this.connections.delete(guild_id || channel_id);
return;
}
const channel = this.client.channels.cache.get(channel_id);
if (channel) {
connection.channel = channel;
connection.setSessionId(session_id);
} else {
this.client.emit('debug', `[VOICE] disconnecting from guild ${guild_id} as channel ${channel_id} is uncached`);
connection.disconnect();
}
// Djs Voice
if (payload.guild_id && payload.session_id && payload.user_id === this.client.user?.id) {
this.adapters.get(payload.guild_id)?.onVoiceStateUpdate(payload);
} else if (payload.channel_id && payload.session_id && payload.user_id === this.client.user?.id) {
this.adapters.get(payload.channel_id)?.onVoiceStateUpdate(payload);
}
}
/**
* @property {boolean} [selfMute=false]
* @property {boolean} [selfDeaf=false]
* @property {boolean} [selfVideo=false]
* @property {VideoCodec} [videoCodec='H264']
* @typedef {Object} JoinChannelConfig
*/
/**
* Sets up a request to join a voice channel.
* @param {VoiceChannel} channel The voice channel to join
* @param {JoinChannelConfig} config Config to join voice channel
* @returns {Promise<VoiceConnection>}
*/
joinChannel(channel, config = {}) {
return new Promise((resolve, reject) => {
if (!['DM', 'GROUP_DM'].includes(channel.type) && !channel.joinable) {
throw new Error('VOICE_JOIN_CHANNEL', channel.full);
}
let connection = this.connections.get(channel.guild?.id || channel.id);
if (connection) {
if (connection.channel.id !== channel.id) {
this.connections.get(channel.guild?.id || channel.id).updateChannel(channel);
}
resolve(connection);
return;
} else {
connection = new VoiceConnection(this, channel);
if (config?.videoCodec) connection.setVideoCodec(config.videoCodec);
connection.on('debug', msg =>
this.client.emit('debug', `[VOICE (${channel.guild?.id || channel.id}:${connection.status})]: ${msg}`),
);
connection.authenticate({
self_mute: Boolean(config.selfMute),
self_deaf: Boolean(config.selfDeaf),
self_video: Boolean(config.selfVideo),
});
this.connections.set(channel.guild?.id || channel.id, connection);
}
connection.once('failed', reason => {
this.connections.delete(channel.guild?.id || channel.id);
reject(reason);
});
connection.on('error', reject);
connection.once('authenticated', () => {
connection.once('ready', () => {
resolve(connection);
connection.removeListener('error', reject);
});
connection.once('disconnect', () => this.connections.delete(channel.guild?.id || channel.id));
});
});
}
}
module.exports = ClientVoiceManager;

View File

@@ -0,0 +1,805 @@
'use strict';
const EventEmitter = require('events');
const { setTimeout } = require('node:timers');
const VoiceUDP = require('./networking/VoiceUDPClient');
const VoiceWebSocket = require('./networking/VoiceWebSocket');
const MediaPlayer = require('./player/MediaPlayer');
const VoiceReceiver = require('./receiver/Receiver');
const { parseStreamKey } = require('./util/Function');
const PlayInterface = require('./util/PlayInterface');
const Silence = require('./util/Silence');
const { Error } = require('../../errors');
const { Opcodes, VoiceOpcodes, VoiceStatus, Events } = require('../../util/Constants');
const Speaking = require('../../util/Speaking');
const Util = require('../../util/Util');
// Workaround for Discord now requiring silence to be sent before being able to receive audio
class SingleSilence extends Silence {
_read() {
super._read();
this.push(null);
}
}
const SUPPORTED_MODES = ['xsalsa20_poly1305_lite', 'xsalsa20_poly1305_suffix', 'xsalsa20_poly1305'];
const SUPPORTED_CODECS = ['VP8', 'H264'];
/**
* Represents a connection to a guild's voice server.
* ```js
* // Obtained using:
* client.voice.joinChannel(channel)
* .then(connection => {
*
* });
* ```
* @extends {EventEmitter}
* @implements {PlayInterface}
*/
class VoiceConnection extends EventEmitter {
constructor(voiceManager, channel) {
super();
/**
* The voice manager that instantiated this connection
* @type {ClientVoiceManager}
*/
this.voiceManager = voiceManager;
/**
* The voice channel this connection is currently serving
* @type {VoiceChannel}
*/
this.channel = channel;
/**
* The current status of the voice connection
* @type {VoiceStatus}
*/
this.status = VoiceStatus.AUTHENTICATING;
/**
* Our current speaking state
* @type {Readonly<Speaking>}
*/
this.speaking = new Speaking().freeze();
/**
* Our current video state
* @type {boolean}
*/
this.videoStatus = false;
/**
* The authentication data needed to connect to the voice server
* @type {Object}
* @private
*/
this.authentication = {};
/**
* The audio player for this voice connection
* @type {MediaPlayer}
*/
this.player = new MediaPlayer(this);
this.player.on('debug', m => {
/**
* Debug info from the connection.
* @event VoiceConnection#debug
* @param {string} message The debug message
*/
this.emit('debug', `audio player - ${m}`);
});
this.player.on('error', e => {
/**
* Warning info from the connection.
* @event VoiceConnection#warn
* @param {string|Error} warning The warning
*/
this.emit('warn', e);
});
this.once('closing', () => this.player.destroy());
/**
* Map SSRC values to user IDs
* @type {Map<number, Snowflake>}
* @private
*/
this.ssrcMap = new Map();
/**
* Tracks which users are talking
* @type {Map<Snowflake, Readonly<Speaking>>}
* @private
*/
this._speaking = new Map();
/**
* Object that wraps contains the `ws` and `udp` sockets of this voice connection
* @type {Object}
* @private
*/
this.sockets = {};
/**
* The voice receiver of this connection
* @type {VoiceReceiver}
*/
this.receiver = new VoiceReceiver(this);
/**
* Video codec (encoded)
* * `VP8`
* * `VP9` (Not supported)
* * `H264`
* * `H265` (Not supported)
* @typedef {string} VideoCodec
*/
/**
* The voice receiver of this connection
* @type {VideoCodec}
*/
this.videoCodec = 'H264';
/**
* Create a stream connection ?
* @type {?StreamConnection}
*/
this.streamConnection = null;
}
/**
* The client that instantiated this connection
* @type {Client}
* @readonly
*/
get client() {
return this.voiceManager.client;
}
/**
* The current audio dispatcher (if any)
* @type {?AudioDispatcher}
* @readonly
*/
get dispatcher() {
return this.player.dispatcher;
}
/**
* The current video dispatcher (if any)
* @type {?VideoDispatcher}
* @readonly
*/
get videoDispatcher() {
return this.player.videoDispatcher;
}
/**
* Sets whether the voice connection should display as "speaking", "soundshare" or "none".
* @param {BitFieldResolvable} value The new speaking state
*/
setSpeaking(value) {
if (this.speaking.equals(value)) return;
if (this.status !== VoiceStatus.CONNECTED) return;
this.speaking = new Speaking(value).freeze();
this.sockets.ws
.sendPacket({
op: VoiceOpcodes.SPEAKING,
d: {
speaking: this.speaking.bitfield,
delay: 0,
ssrc: this.authentication.ssrc,
},
})
.catch(e => {
this.emit('debug', e);
});
}
/**
* Set video codec before select protocol
* @param {VideoCodec} value Codec
* @returns {VoiceConnection}
*/
setVideoCodec(value) {
if (!SUPPORTED_CODECS.includes(value)) throw new Error('INVALID_VIDEO_CODEC', SUPPORTED_CODECS);
this.videoCodec = value;
return this;
}
/**
* Sets video status
* @param {boolean} value Video on or off
*/
setVideoStatus(value) {
if (this.status !== VoiceStatus.CONNECTED) return;
if (value === this.videoStatus) return;
this.videoStatus = value;
this.sockets.ws
.sendPacket({
op: VoiceOpcodes.SOURCES,
d: {
audio_ssrc: this.authentication.ssrc,
video_ssrc: value ? this.authentication.ssrc + 1 : 0,
rtx_ssrc: value ? this.authentication.ssrc + 2 : 0,
streams: [
{
type: 'video',
rid: '100',
ssrc: value ? this.authentication.ssrc + 1 : 0,
active: true,
quality: 100,
rtx_ssrc: value ? this.authentication.ssrc + 2 : 0,
max_bitrate: 8000000,
max_framerate: 60,
max_resolution: {
type: 'source',
width: 0,
height: 0,
},
},
],
},
})
.catch(e => {
this.emit('debug', e);
});
}
/**
* The voice state of this connection
* @type {?VoiceState}
*/
get voice() {
return this.client.user.voice;
}
/**
* Sends a request to the main gateway to join a voice channel.
* @param {Object} [options] The options to provide
* @returns {Promise<Shard>}
* @private
*/
sendVoiceStateUpdate(options = {}) {
options = Util.mergeDefault(
{
guild_id: this.channel.guild?.id || null,
channel_id: this.channel.id,
self_mute: this.voice ? this.voice.selfMute : false,
self_deaf: this.voice ? this.voice.selfDeaf : false,
self_video: this.voice ? this.voice.selfVideo : false,
flags: 2,
},
options,
);
this.emit('debug', `Sending voice state update: ${JSON.stringify(options)}`);
return this.channel.client.ws.broadcast({
op: Opcodes.VOICE_STATE_UPDATE,
d: options,
});
}
/**
* Set the token and endpoint required to connect to the voice servers.
* @param {string} token The voice token
* @param {string} endpoint The voice endpoint
* @returns {void}
* @private
*/
setTokenAndEndpoint(token, endpoint) {
this.emit('debug', `Token "${token}" and endpoint "${endpoint}"`);
if (!endpoint) {
// Signifies awaiting endpoint stage
return;
}
if (!token) {
this.authenticateFailed('VOICE_TOKEN_ABSENT');
return;
}
endpoint = endpoint.match(/([^:]*)/)[0];
this.emit('debug', `Endpoint resolved as ${endpoint}`);
if (!endpoint) {
this.authenticateFailed('VOICE_INVALID_ENDPOINT');
return;
}
if (this.status === VoiceStatus.AUTHENTICATING) {
this.authentication.token = token;
this.authentication.endpoint = endpoint;
this.checkAuthenticated();
} else if (token !== this.authentication.token || endpoint !== this.authentication.endpoint) {
this.reconnect(token, endpoint);
}
}
/**
* Sets the Session ID for the connection.
* @param {string} sessionId The voice session ID
* @private
*/
setSessionId(sessionId) {
this.emit('debug', `Setting sessionId ${sessionId} (stored as "${this.authentication.sessionId}")`);
if (!sessionId) {
this.authenticateFailed('VOICE_SESSION_ABSENT');
return;
}
if (this.status === VoiceStatus.AUTHENTICATING) {
this.authentication.sessionId = sessionId;
this.checkAuthenticated();
} else if (sessionId !== this.authentication.sessionId) {
this.authentication.sessionId = sessionId;
/**
* Emitted when a new session ID is received.
* @event VoiceConnection#newSession
* @private
*/
this.emit('newSession', sessionId);
}
}
/**
* Checks whether the voice connection is authenticated.
* @private
*/
checkAuthenticated() {
const { token, endpoint, sessionId } = this.authentication;
this.emit('debug', `Authenticated with sessionId ${sessionId}`);
if (token && endpoint && sessionId) {
this.status = VoiceStatus.CONNECTING;
/**
* Emitted when we successfully initiate a voice connection.
* @event VoiceConnection#authenticated
*/
this.emit('authenticated');
this.connect();
}
}
/**
* Invoked when we fail to initiate a voice connection.
* @param {string} reason The reason for failure
* @private
*/
authenticateFailed(reason) {
clearTimeout(this.connectTimeout);
this.emit('debug', `Authenticate failed - ${reason}`);
if (this.status === VoiceStatus.AUTHENTICATING) {
/**
* Emitted when we fail to initiate a voice connection.
* @event VoiceConnection#failed
* @param {Error} error The encountered error
*/
this.emit('failed', new Error(reason));
} else {
/**
* Emitted whenever the connection encounters an error.
* @event VoiceConnection#error
* @param {Error} error The encountered error
*/
this.emit('error', new Error(reason));
}
this.status = VoiceStatus.DISCONNECTED;
}
/**
* Move to a different voice channel in the same guild.
* @param {VoiceChannel} channel The channel to move to
* @private
*/
updateChannel(channel) {
this.channel = channel;
this.sendVoiceStateUpdate();
}
/**
* Attempts to authenticate to the voice server.
* @param {Object} options Join config
* @private
*/
authenticate(options = {}) {
this.sendVoiceStateUpdate(options);
this.connectTimeout = setTimeout(() => this.authenticateFailed('VOICE_CONNECTION_TIMEOUT'), 15_000).unref();
}
/**
* Attempts to reconnect to the voice server (typically after a region change).
* @param {string} token The voice token
* @param {string} endpoint The voice endpoint
* @private
*/
reconnect(token, endpoint) {
this.authentication.token = token;
this.authentication.endpoint = endpoint;
this.speaking = new Speaking().freeze();
this.status = VoiceStatus.RECONNECTING;
this.emit('debug', `Reconnecting to ${endpoint}`);
/**
* Emitted when the voice connection is reconnecting (typically after a region change).
* @event VoiceConnection#reconnecting
*/
this.emit('reconnecting');
this.connect();
}
/**
* Disconnects the voice connection, causing a disconnect and closing event to be emitted.
*/
disconnect() {
this.emit('closing');
this.emit('debug', 'disconnect() triggered');
clearTimeout(this.connectTimeout);
const conn = this.voiceManager.connections.get(this.channel.guild?.id || this.channel.id);
if (conn === this) this.voiceManager.connections.delete(this.channel.guild?.id || this.channel.id);
this.sendVoiceStateUpdate({
channel_id: null,
});
this._disconnect();
}
/**
* Internally disconnects (doesn't send disconnect packet).
* @private
*/
_disconnect() {
this.cleanup();
this.status = VoiceStatus.DISCONNECTED;
/**
* Emitted when the voice connection disconnects.
* @event VoiceConnection#disconnect
*/
this.emit('disconnect');
}
/**
* Cleans up after disconnect.
* @private
*/
cleanup() {
this.player.destroy();
this.speaking = new Speaking().freeze();
const { ws, udp } = this.sockets;
this.emit('debug', 'Connection clean up');
if (ws) {
ws.removeAllListeners('error');
ws.removeAllListeners('ready');
ws.removeAllListeners('sessionDescription');
ws.removeAllListeners('speaking');
ws.shutdown();
}
if (udp) udp.removeAllListeners('error');
this.sockets.ws = null;
this.sockets.udp = null;
}
/**
* Connect the voice connection.
* @private
*/
connect() {
this.emit('debug', `Connect triggered`);
if (this.status !== VoiceStatus.RECONNECTING) {
if (this.sockets.ws) throw new Error('WS_CONNECTION_EXISTS');
if (this.sockets.udp) throw new Error('UDP_CONNECTION_EXISTS');
}
if (this.sockets.ws) this.sockets.ws.shutdown();
if (this.sockets.udp) this.sockets.udp.shutdown();
this.sockets.ws = new VoiceWebSocket(this);
this.sockets.udp = new VoiceUDP(this);
const { ws, udp } = this.sockets;
ws.on('debug', msg => this.emit('debug', msg));
udp.on('debug', msg => this.emit('debug', msg));
ws.on('error', err => this.emit('error', err));
udp.on('error', err => this.emit('error', err));
ws.on('ready', this.onReady.bind(this));
ws.on('sessionDescription', this.onSessionDescription.bind(this));
ws.on('startSpeaking', this.onStartSpeaking.bind(this));
this.sockets.ws.connect();
}
/**
* Invoked when the voice websocket is ready.
* @param {Object} data The received data
* @private
*/
onReady(data) {
Object.assign(this.authentication, data);
for (let mode of data.modes) {
if (SUPPORTED_MODES.includes(mode)) {
this.authentication.mode = mode;
this.emit('debug', `Selecting the ${mode} mode`);
break;
}
}
this.sockets.udp.createUDPSocket(data.ip);
}
/**
* Invoked when a session description is received.
* @param {Object} data The received data
* @private
*/
onSessionDescription(data) {
Object.assign(this.authentication, data);
this.status = VoiceStatus.CONNECTED;
const ready = () => {
clearTimeout(this.connectTimeout);
this.emit('debug', `Ready with authentication details: ${JSON.stringify(this.authentication)}`);
/**
* Emitted once the connection is ready, when a promise to join a voice channel resolves,
* the connection will already be ready.
* @event VoiceConnection#ready
*/
this.emit('ready');
};
if (this.dispatcher || this.videoDispatcher) {
ready();
} else {
// This serves to provide support for voice receive, sending audio is required to receive it.
const dispatcher = this.playAudio(new SingleSilence(), { type: 'opus', volume: false });
dispatcher.once('finish', ready);
}
}
onStartSpeaking({ user_id, ssrc, speaking }) {
this.ssrcMap.set(+ssrc, {
...(this.ssrcMap.get(+ssrc) || {}),
userId: user_id,
speaking: speaking,
});
}
/**
* Invoked when a speaking event is received.
* @param {Object} data The received data
* @private
*/
onSpeaking({ user_id, speaking }) {
speaking = new Speaking(speaking).freeze();
const guild = this.channel.guild;
const user = this.client.users.cache.get(user_id);
const old = this._speaking.get(user_id) || new Speaking(0).freeze();
this._speaking.set(user_id, speaking);
/**
* Emitted whenever a user changes speaking state.
* @event VoiceConnection#speaking
* @param {User} user The user that has changed speaking state
* @param {Readonly<Speaking>} speaking The speaking state of the user
*/
if (this.status === VoiceStatus.CONNECTED) {
this.emit('speaking', user, speaking);
if (!speaking.has(Speaking.FLAGS.SPEAKING)) {
this.receiver.packets._stoppedSpeaking(user_id);
}
}
if (guild && user && !speaking.equals(old)) {
const member = guild.members.cache.get(user);
if (member) {
/**
* Emitted once a guild member changes speaking state.
* @event Client#guildMemberSpeaking
* @param {GuildMember} member The member that started/stopped speaking
* @param {Readonly<Speaking>} speaking The speaking state of the member
*/
this.client.emit(Events.GUILD_MEMBER_SPEAKING, member, speaking);
}
}
}
playAudio() {} // eslint-disable-line no-empty-function
playVideo() {} // eslint-disable-line no-empty-function
/**
* Create new connection to screenshare stream
* @returns {Promise<StreamConnection>}
*/
createStreamConnection() {
// eslint-disable-next-line consistent-return
return new Promise((resolve, reject) => {
if (this.streamConnection) {
return resolve(this.streamConnection);
} else {
const connection = (this.streamConnection = new StreamConnection(this.voiceManager, this.channel, this));
connection.setVideoCodec(this.videoCodec); // Sync :?
// Setup event...
if (!this.eventHook) {
this.eventHook = true; // Dont listen this event two times :/
this.channel.client.on('raw', packet => {
if (
typeof packet !== 'object' ||
!packet.t ||
!packet.d ||
!this.streamConnection ||
!packet.d?.stream_key
) {
return;
}
const { t: event, d: data } = packet;
const StreamKey = parseStreamKey(data.stream_key);
if (StreamKey.userId === this.channel.client.user.id && this.channel.id == StreamKey.channelId) {
switch (event) {
case 'STREAM_CREATE': {
this.streamConnection.setSessionId(this.authentication.sessionId);
this.streamConnection.serverId = data.rtc_server_id;
break;
}
case 'STREAM_SERVER_UPDATE': {
this.streamConnection.setTokenAndEndpoint(data.token, data.endpoint);
break;
}
case 'STREAM_DELETE': {
this.streamConnection.disconnect();
break;
}
}
}
});
}
connection.sendSignalScreenshare();
connection.sendScreenshareState(true);
connection.on('debug', msg =>
this.channel.client.emit(
'debug',
`[VOICE STREAM (${this.channel.guild?.id || this.channel.id}:${connection.status})]: ${msg}`,
),
);
connection.once('failed', reason => {
this.streamConnection = null;
reject(reason);
});
connection.on('error', reject);
connection.once('authenticated', () => {
connection.once('ready', () => {
resolve(connection);
connection.removeListener('error', reject);
});
connection.once('disconnect', () => {
this.streamConnection = null;
});
});
}
});
}
}
class StreamConnection extends VoiceConnection {
#requestDisconnect = false;
constructor(voiceManager, channel, voiceConnection) {
super(voiceManager, channel);
/**
* Current voice connection
* @type {VoiceConnection}
*/
this.voiceConnection = voiceConnection;
Object.defineProperty(this, 'voiceConnection', {
value: voiceConnection,
writable: false,
});
/**
* Server Id
* @type {string | null}
*/
this.serverId = null;
/**
* Stream state
* @type {boolean}
*/
this.isPaused = false;
}
createStreamConnection() {
return Promise.resolve(this);
}
get streamConnection() {
return this;
}
set streamConnection(value) {
// Why ?
}
disconnect() {
if (this.#requestDisconnect) return;
this.emit('closing');
this.emit('debug', 'Stream: disconnect() triggered');
clearTimeout(this.connectTimeout);
if (this.voiceConnection.streamConnection === this) this.voiceConnection.streamConnection = null;
this.sendStopScreenshare();
this._disconnect();
}
/**
* Create new stream connection (WS packet)
* @returns {void}
*/
sendSignalScreenshare() {
const data = {
type: ['DM', 'GROUP_DM'].includes(this.channel.type) ? 'call' : 'guild',
guild_id: this.channel.guild?.id || null,
channel_id: this.channel.id,
preferred_region: null,
};
this.emit('debug', `Signal Stream: ${JSON.stringify(data)}`);
return this.channel.client.ws.broadcast({
op: Opcodes.STREAM_CREATE,
d: data,
});
}
/**
* Send screenshare state... (WS)
* @param {boolean} isPaused screenshare paused ?
* @returns {void}
*/
sendScreenshareState(isPaused = false) {
if (isPaused == this.isPaused) return;
this.isPaused = isPaused;
this.channel.client.ws.broadcast({
op: Opcodes.STREAM_SET_PAUSED,
d: {
stream_key: this.streamKey,
paused: isPaused,
},
});
}
/**
* Stop screenshare, delete this connection (WS)
* @returns {void}
* @private Using StreamConnection#disconnect()
*/
sendStopScreenshare() {
this.#requestDisconnect = true;
this.channel.client.ws.broadcast({
op: Opcodes.STREAM_DELETE,
d: {
stream_key: this.streamKey,
},
});
}
/**
* Current stream key
* @type {string}
*/
get streamKey() {
return `${['DM', 'GROUP_DM'].includes(this.channel.type) ? 'call' : `guild:${this.channel.guild.id}`}:${
this.channel.id
}:${this.channel.client.user.id}`;
}
}
PlayInterface.applyToClass(VoiceConnection);
PlayInterface.applyToClass(StreamConnection);
module.exports = VoiceConnection;

View File

@@ -0,0 +1,126 @@
'use strict';
/*
Credit: https://github.com/dank074/Discord-video-stream
The use of video streaming in this library is an incomplete implementation with many bugs, primarily aimed at lazy users.
The video streaming features in this library are sourced from https://github.com/dank074/Discord-video-stream.
Please use the @dank074/discord-video-stream library to access all advanced and professional features,
along with comprehensive support. I will not actively fix bugs related to streaming and encourage everyone to
use https://github.com/dank074/Discord-video-stream for stable and smooth streaming.
To reiterate: This is an incomplete implementation of the library https://github.com/dank074/Discord-video-stream.
Thanks to dank074 and longnguyen2004 for implementing new codecs (H264, H265).
Thanks to mrjvs for discovering how Discord transmits data and the VP8 codec.
Please use the @dank074/discord-video-stream library for the best support.
*/
const { Buffer } = require('buffer');
const VideoDispatcher = require('./VideoDispatcher');
const { H264Helpers, H265Helpers } = require('../player/processing/AnnexBNalSplitter');
class AnnexBDispatcher extends VideoDispatcher {
constructor(player, highWaterMark = 12, streams, fps, nalFunctions) {
super(player, highWaterMark, streams, fps);
this._nalFunctions = nalFunctions;
}
codecCallback(frame) {
let accessUnit = frame;
const nalus = [];
let offset = 0;
// Extract NALUs from the access unit
while (offset < accessUnit.length) {
const naluSize = accessUnit.readUInt32BE(offset);
offset += 4;
const nalu = accessUnit.subarray(offset, offset + naluSize);
nalus.push(nalu);
offset += naluSize;
}
nalus.forEach((nalu, index) => {
const isLastNal = index === nalus.length - 1;
if (nalu.length <= this.mtu) {
// If NALU size is within MTU, send it directly
this._playChunk(Buffer.concat([this.createHeaderExtension(), nalu]), index + 1 === nalus.length);
} else {
// If NALU size exceeds MTU, fragment it
const [naluHeader, naluData] = this._nalFunctions.splitHeader(nalu);
const dataFragments = this.partitionVideoData(naluData);
dataFragments.forEach((data, fragmentIndex) => {
const isFirstPacket = fragmentIndex === 0;
const isFinalPacket = fragmentIndex === dataFragments.length - 1;
const markerBit = isLastNal && isFinalPacket; // Is last packet ?
this._playChunk(
Buffer.concat([
this.createHeaderExtension(),
this.makeFragmentationUnitHeader(isFirstPacket, isFinalPacket, naluHeader),
data,
]),
markerBit,
);
});
}
});
}
}
class H264Dispatcher extends AnnexBDispatcher {
constructor(player, highWaterMark = 12, streams, fps) {
super(player, highWaterMark, streams, fps, H264Helpers);
}
makeFragmentationUnitHeader(isFirstPacket, isLastPacket, naluHeader) {
const nal0 = naluHeader[0];
const fuPayloadHeader = Buffer.alloc(2);
const nalType = H264Helpers.getUnitType(naluHeader);
const fnri = nal0 & 0xe0;
fuPayloadHeader[0] = 0x1c | fnri;
if (isFirstPacket) {
fuPayloadHeader[1] = 0x80 | nalType;
} else if (isLastPacket) {
fuPayloadHeader[1] = 0x40 | nalType;
} else {
fuPayloadHeader[1] = nalType;
}
return fuPayloadHeader;
}
}
class H265Dispatcher extends AnnexBDispatcher {
constructor(player, highWaterMark = 12, streams, fps) {
super(player, highWaterMark, streams, fps, H265Helpers);
}
makeFragmentationUnitHeader(isFirstPacket, isLastPacket, naluHeader) {
const fuIndicatorHeader = Buffer.allocUnsafe(3);
naluHeader.copy(fuIndicatorHeader);
const nalType = H265Helpers.getUnitType(naluHeader);
fuIndicatorHeader[0] = (fuIndicatorHeader[0] & 0b10000001) | (49 << 1);
if (isFirstPacket) {
fuIndicatorHeader[2] = 0x80 | nalType;
} else if (isLastPacket) {
fuIndicatorHeader[2] = 0x40 | nalType;
} else {
fuIndicatorHeader[2] = nalType;
}
return fuIndicatorHeader;
}
}
module.exports = {
H264Dispatcher,
H265Dispatcher,
};

View File

@@ -0,0 +1,115 @@
'use strict';
const BaseDispatcher = require('./BaseDispatcher');
const Silence = require('../util/Silence');
const VolumeInterface = require('../util/VolumeInterface');
/**
* @external WritableStream
* @see {@link https://nodejs.org/api/stream.html#stream_class_stream_writable}
*/
/**
* The class that sends voice packet data to the voice connection.
* ```js
* // Obtained using:
* client.voice.joinChannel(channel).then(connection => {
* // You can play a file or a stream here:
* const dispatcher = connection.playAudio('/home/hydrabolt/audio.mp3');
* });
* ```
* @implements {VolumeInterface}
* @extends {WritableStream}
*/
class AudioDispatcher extends BaseDispatcher {
constructor(player, { seek = 0, volume = 1, fec, plp, bitrate = 96, highWaterMark = 12 } = {}, streams) {
const streamOptions = { seek, volume, fec, plp, bitrate, highWaterMark };
super(player, highWaterMark, 120, false, streams);
this.streamOptions = streamOptions;
this.streams.silence = new Silence();
this.setVolume(volume);
this.setBitrate(bitrate);
if (typeof fec !== 'undefined') this.setFEC(fec);
if (typeof plp !== 'undefined') this.setPLP(plp);
}
/**
* Set the bitrate of the current Opus encoder if using a compatible Opus stream.
* @param {number} value New bitrate, in kbps
* If set to 'auto', the voice channel's bitrate will be used
* @returns {boolean} true if the bitrate has been successfully changed.
*/
setBitrate(value) {
if (!value || !this.bitrateEditable) return false;
const bitrate = value === 'auto' ? this.player.voiceConnection.channel.bitrate : value;
this.streams.opus.setBitrate(bitrate * 1000);
return true;
}
/**
* Sets the expected packet loss percentage if using a compatible Opus stream.
* @param {number} value between 0 and 1
* @returns {boolean} Returns true if it was successfully set.
*/
setPLP(value) {
if (!this.bitrateEditable) return false;
this.streams.opus.setPLP(value);
return true;
}
/**
* Enables or disables forward error correction if using a compatible Opus stream.
* @param {boolean} enabled true to enable
* @returns {boolean} Returns true if it was successfully set.
*/
setFEC(enabled) {
if (!this.bitrateEditable) return false;
this.streams.opus.setFEC(enabled);
return true;
}
get volumeEditable() {
return Boolean(this.streams.volume);
}
/**
* Whether or not the Opus bitrate of this stream is editable
* @type {boolean}
* @readonly
*/
get bitrateEditable() {
return this.streams.opus && this.streams.opus.setBitrate;
}
// Volume
get volume() {
return this.streams.volume ? this.streams.volume.volume : 1;
}
setVolume(value) {
if (!this.streams.volume) return false;
/**
* Emitted when the volume of this dispatcher changes.
* @event AudioDispatcher#volumeChange
* @param {number} oldVolume The old volume of this dispatcher
* @param {number} newVolume The new volume of this dispatcher
*/
this.emit('volumeChange', this.volume, value);
this.streams.volume.setVolume(value);
return true;
}
// Volume stubs for docs
/* eslint-disable no-empty-function*/
get volumeDecibels() {}
get volumeLogarithmic() {}
setVolumeDecibels() {}
setVolumeLogarithmic() {}
}
VolumeInterface.applyToClass(AudioDispatcher);
module.exports = AudioDispatcher;

View File

@@ -0,0 +1,401 @@
'use strict';
const { Buffer } = require('node:buffer');
const { setTimeout } = require('node:timers');
const { Writable } = require('stream');
const find = require('find-process');
const kill = require('tree-kill');
const secretbox = require('../util/Secretbox');
const CHANNELS = 2;
const MAX_NONCE_SIZE = 2 ** 32 - 1;
const nonce = Buffer.alloc(24);
/**
* @external WritableStream
* @see {@link https://nodejs.org/api/stream.html#stream_class_stream_writable}
*/
class BaseDispatcher extends Writable {
constructor(player, highWaterMark = 12, payloadType, extensionEnabled, streams = {}) {
super({
highWaterMark,
});
this.streams = streams;
/**
* The Audio Player that controls this dispatcher
* @type {MediaPlayer}
*/
this.player = player;
this.payloadType = payloadType;
this.extensionEnabled = extensionEnabled;
this._nonce = 0;
this._nonceBuffer = Buffer.alloc(24);
/**
* The time that the stream was paused at (null if not paused)
* @type {?number}
*/
this.pausedSince = null;
this._writeCallback = null;
this._pausedTime = 0;
this._silentPausedTime = 0;
this.count = 0;
this.sequence = 0;
this.timestamp = 0;
/**
* Video FPS
* @type {number}
*/
this.fps = 0;
this.mtu = 1200;
const streamError = (type, err) => {
/**
* Emitted when the dispatcher encounters an error.
* @event AudioDispatcher#error
*/
if (type && err) {
err.message = `${type} stream: ${err.message}`;
this.emit(this.player.dispatcher === this ? 'error' : 'debug', err);
}
this.destroy();
};
this.on('error', () => streamError());
if (this.streams.input) this.streams.input.on('error', err => streamError('input', err));
if (this.streams.ffmpeg) this.streams.ffmpeg.on('error', err => streamError('ffmpeg', err));
if (this.streams.opus) this.streams.opus.on('error', err => streamError('opus', err));
if (this.streams.volume) this.streams.volume.on('error', err => streamError('volume', err));
this.on('finish', () => {
this._cleanup();
this._setSpeaking(0);
this._setVideoStatus(false);
this._setStreamStatus(true);
});
}
get TIMESTAMP_INC() {
return this.extensionEnabled ? 90000 / this.fps : 480 * CHANNELS;
}
get FRAME_LENGTH() {
return this.extensionEnabled ? 1000 / this.fps : 20;
}
partitionVideoData(data) {
const out = [];
const dataLength = data.length;
for (let i = 0; i < dataLength; i += this.mtu) {
out.push(data.slice(i, i + this.mtu));
}
return out;
}
getNewSequence() {
const currentSeq = this.sequence;
this.sequence++;
if (this.sequence >= 2 ** 16) this.sequence = 0;
return currentSeq;
}
_write(chunk, enc, done) {
if (!this.startTime) {
/**
* Emitted once the stream has started to play.
* @event AudioDispatcher#start
*/
this.emit('start');
this.startTime = performance.now();
}
if (this.extensionEnabled) {
this.codecCallback(chunk);
} else {
this._playChunk(chunk);
}
this._step(done);
}
_destroy(err, cb) {
this._cleanup();
super._destroy(err, cb);
}
_cleanup() {
if (this.player.dispatcher === this) this.player.dispatcher = null;
const { streams } = this;
if (streams.opus) streams.opus.destroy();
if (streams.ffmpeg) {
const ffmpegPid = streams.ffmpeg.process.pid; // But it is ppid ;-;
const args = streams.ffmpeg.process.spawnargs.slice(1).join(' '); // Skip ffmpeg
find('name', 'ffmpeg', true).then(list => {
let process = list.find(o => o.pid === ffmpegPid || o.ppid === ffmpegPid || o.cmd.includes(args));
if (process) {
kill(process.pid);
}
});
streams.ffmpeg.destroy();
}
}
/**
* Pauses playback
* @param {boolean} [silence=false] Whether to play silence while paused to prevent audio glitches
*/
pause(silence = false) {
if (this.paused) return;
if (this.streams.opus) this.streams.opus.unpipe(this); // Audio
if (this.streams.video) {
this.streams.ffmpeg.pause();
this.streams.video.unpipe(this);
}
if (!this.extensionEnabled) {
// Audio
if (silence) {
this.streams.silence.pipe(this);
this._silence = true;
} else {
this._setSpeaking(0);
}
}
this.pausedSince = performance.now();
}
/**
* Whether or not playback is paused
* @type {boolean}
* @readonly
*/
get paused() {
return Boolean(this.pausedSince);
}
/**
* Total time that this dispatcher has been paused in milliseconds
* @type {number}
* @readonly
*/
get pausedTime() {
return this._silentPausedTime + this._pausedTime + (this.paused ? performance.now() - this.pausedSince : 0);
}
/**
* Resumes playback
*/
resume() {
if (!this.pausedSince) return;
if (!this.extensionEnabled) this.streams.silence.unpipe(this);
if (this.streams.opus) this.streams.opus.pipe(this);
if (this.streams.video) {
this.streams.ffmpeg.resume();
this.streams.video.pipe(this);
}
if (this._silence) {
this._silentPausedTime += performance.now() - this.pausedSince;
this._silence = false;
} else {
this._pausedTime += performance.now() - this.pausedSince;
}
this.pausedSince = null;
if (typeof this._writeCallback === 'function') this._writeCallback();
}
/**
* The time (in milliseconds) that the dispatcher has been playing audio for, taking into account skips and pauses
* @type {number}
* @readonly
*/
get totalStreamTime() {
return performance.now() - this.startTime;
}
_step(done) {
this._writeCallback = () => {
this._writeCallback = null;
done();
};
const next = (this.count + 1) * this.FRAME_LENGTH - (performance.now() - this.startTime - this._pausedTime);
setTimeout(() => {
if ((!this.pausedSince || this._silence) && this._writeCallback) this._writeCallback();
}, next).unref();
this.timestamp += this.TIMESTAMP_INC;
if (this.timestamp >= 2 ** 32) this.timestamp = 0;
this.count++;
}
_final(callback) {
this._writeCallback = null;
callback();
}
_playChunk(chunk, isLastPacket) {
if (
(this.player.dispatcher !== this && this.player.videoDispatcher !== this) ||
!this.player.voiceConnection.authentication.secret_key
) {
return;
}
this[this.extensionEnabled ? '_sendVideoPacket' : '_sendPacket'](this._createPacket(chunk, isLastPacket));
}
/**
* Creates a single extension of type playout-delay
* Discord seems to send this extension on every video packet
* @see https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay
* @returns {Buffer} playout-delay extension
* @private
*/
createHeaderExtension() {
const extensions = [{ id: 5, len: 2, val: 0 }];
/**
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| defined by profile | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
const profile = Buffer.alloc(4);
profile[0] = 0xbe;
profile[1] = 0xde;
profile.writeInt16BE(extensions.length, 2); // Extension count
const extensionsData = [];
for (let ext of extensions) {
/**
* EXTENSION DATA - each extension payload is 32 bits
*/
const data = Buffer.alloc(4);
/**
* 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
| ID | len |
+-+-+-+-+-+-+-+-+
where len = actual length - 1
*/
data[0] = (ext.id & 0b00001111) << 4;
data[0] |= (ext.len - 1) & 0b00001111;
/** Specific to type playout-delay
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| MIN delay | MAX delay |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
data.writeUIntBE(ext.val, 1, 2); // Not quite but its 0 anyway
extensionsData.push(data);
}
return Buffer.concat([profile, ...extensionsData]);
}
_encrypt(buffer) {
const { secret_key, mode } = this.player.voiceConnection.authentication;
if (mode === 'xsalsa20_poly1305_lite') {
this._nonce++;
if (this._nonce > MAX_NONCE_SIZE) this._nonce = 0;
this._nonceBuffer.writeUInt32BE(this._nonce, 0);
return [secretbox.methods.close(buffer, this._nonceBuffer, secret_key), this._nonceBuffer.slice(0, 4)];
} else if (mode === 'xsalsa20_poly1305_suffix') {
const random = secretbox.methods.random(24);
return [secretbox.methods.close(buffer, random, secret_key), random];
} else {
return [secretbox.methods.close(buffer, nonce, secret_key)];
}
}
_createPacket(buffer, isLastPacket = false) {
// Header
const packetBuffer = Buffer.alloc(12);
packetBuffer[0] = (2 << 6) | ((this.extensionEnabled ? 1 : 0) << 4);
packetBuffer[1] = this.payloadType;
if (this.extensionEnabled) {
if (isLastPacket) {
packetBuffer[1] |= 0b10000000;
}
}
packetBuffer.writeUIntBE(this.getNewSequence(), 2, 2);
packetBuffer.writeUIntBE(this.timestamp, 4, 4);
packetBuffer.writeUIntBE(this.player.voiceConnection.authentication.ssrc + this.extensionEnabled, 8, 4);
packetBuffer.copy(nonce, 0, 0, 12);
return Buffer.concat([packetBuffer, ...this._encrypt(buffer)]);
}
_sendPacket(packet) {
/**
* Emitted whenever the dispatcher has debug information.
* @event AudioDispatcher#debug
* @param {string} info The debug info
*/
this._setSpeaking(1);
if (!this.player.voiceConnection.sockets.udp) {
this.emit('debug', 'Failed to send a packet - no UDP socket');
return;
}
this.player.voiceConnection.sockets.udp.send(packet).catch(e => {
this._setSpeaking(0);
this.emit('debug', `Failed to send a packet - ${e}`);
});
}
_sendVideoPacket(packet) {
this._setVideoStatus(true);
this._setStreamStatus(false);
if (!this.player.voiceConnection.sockets.udp) {
this.emit('debug', 'Failed to send a video packet - no UDP socket');
return;
}
this.player.voiceConnection.sockets.udp.send(packet).catch(e => {
this._setVideoStatus(false);
this._setStreamStatus(true);
this.emit('debug', `Failed to send a video packet - ${e}`);
});
}
_setSpeaking(value) {
if (typeof this.player.voiceConnection !== 'undefined') {
this.player.voiceConnection.setSpeaking(value);
}
/**
* Emitted when the dispatcher starts/stops speaking.
* @event AudioDispatcher#speaking
* @param {boolean} value Whether or not the dispatcher is speaking
*/
this.emit('speaking', value);
}
_setVideoStatus(value) {
if (typeof this.player.voiceConnection !== 'undefined') {
this.player.voiceConnection.setVideoStatus(value);
}
/**
* Emitted when the dispatcher starts/stops video.
* @event AudioDispatcher#videoStatus
* @param {boolean} value Whether or not the dispatcher is enable video
*/
this.emit('videoStatus', value);
}
_setStreamStatus(value) {
if (typeof this.player.voiceConnection?.sendScreenshareState !== 'undefined') {
this.player.voiceConnection.sendScreenshareState(value);
}
/**
* Emitted when the dispatcher starts/stops video.
* @event AudioDispatcher#streamStatus
* @param {boolean} isPaused Whether or not the dispatcher is pause video
*/
this.emit('streamStatus', value);
}
}
module.exports = BaseDispatcher;

View File

@@ -0,0 +1,55 @@
'use strict';
/*
Credit: https://github.com/dank074/Discord-video-stream
The use of video streaming in this library is an incomplete implementation with many bugs, primarily aimed at lazy users.
The video streaming features in this library are sourced from https://github.com/dank074/Discord-video-stream.
Please use the @dank074/discord-video-stream library to access all advanced and professional features,
along with comprehensive support. I will not actively fix bugs related to streaming and encourage everyone to
use https://github.com/dank074/Discord-video-stream for stable and smooth streaming.
To reiterate: This is an incomplete implementation of the library https://github.com/dank074/Discord-video-stream.
Thanks to dank074 and longnguyen2004 for implementing new codecs (H264, H265).
Thanks to mrjvs for discovering how Discord transmits data and the VP8 codec.
Please use the @dank074/discord-video-stream library for the best support.
*/
const { Buffer } = require('node:buffer');
const VideoDispatcher = require('./VideoDispatcher');
class VP8Dispatcher extends VideoDispatcher {
constructor(player, highWaterMark = 12, streams, fps) {
super(player, highWaterMark, streams, fps);
}
makeChunk(buffer, i) {
// Make frame
const headerExtensionBuf = this.createHeaderExtension();
// Vp8 payload descriptor
const payloadDescriptorBuf = Buffer.alloc(2);
payloadDescriptorBuf[0] = 0x80;
payloadDescriptorBuf[1] = 0x80;
if (i == 0) {
payloadDescriptorBuf[0] |= 0b00010000; // Mark S bit, indicates start of frame
}
// Vp8 pictureid payload extension
const pictureIdBuf = Buffer.alloc(2);
pictureIdBuf.writeUIntBE(this.count, 0, 2);
pictureIdBuf[0] |= 0b10000000;
return Buffer.concat([headerExtensionBuf, payloadDescriptorBuf, pictureIdBuf, buffer]);
}
codecCallback(chunk) {
const chunkSplit = this.partitionVideoData(chunk);
for (let i = 0; i < chunkSplit.length; i++) {
this._playChunk(this.makeChunk(chunkSplit[i], i), i + 1 === chunkSplit.length);
}
}
}
module.exports = {
VP8Dispatcher,
};

View File

@@ -0,0 +1,32 @@
'use strict';
const BaseDispatcher = require('./BaseDispatcher');
/**
* The class that sends video packet data to the voice connection.
* ```js
* // Obtained using:
* client.voice.joinChannel(channel).then(connection => {
* // You can play a file or a stream here:
* const dispatcher = connection.playVideo('/home/hydrabolt/video.mp4', { fps: 60, preset: 'ultrafast' });
* });
* ```
* @implements {VolumeInterface}
* @extends {WritableStream}
*/
class VideoDispatcher extends BaseDispatcher {
constructor(player, highWaterMark = 12, streams, fps) {
super(player, highWaterMark, 101, true, streams);
this.fps = fps;
}
/**
* Set FPS
* @param {number} value fps
*/
setFPSSource(value) {
this.fps = value;
}
}
module.exports = VideoDispatcher;

View File

@@ -0,0 +1,188 @@
'use strict';
const udp = require('dgram');
const EventEmitter = require('events');
const { isIPv4 } = require('net');
const { Buffer } = require('node:buffer');
const { Error } = require('../../../errors');
const { VoiceOpcodes } = require('../../../util/Constants');
/**
* Represents a UDP client for a Voice Connection.
* @extends {EventEmitter}
* @private
*/
class VoiceConnectionUDPClient extends EventEmitter {
constructor(voiceConnection) {
super();
/**
* The voice connection that this UDP client serves
* @type {VoiceConnection}
*/
this.voiceConnection = voiceConnection;
/**
* The UDP socket
* @type {?Socket}
*/
this.socket = null;
/**
* The address of the Discord voice server
* @type {?string}
*/
this.discordAddress = null;
/**
* The local IP address
* @type {?string}
*/
this.localAddress = null;
/**
* The local port
* @type {?string}
*/
this.localPort = null;
this.voiceConnection.on('closing', this.shutdown.bind(this));
}
shutdown() {
this.emit('debug', `[UDP] shutdown requested`);
if (this.socket) {
this.socket.removeAllListeners('message');
try {
this.socket.close();
} finally {
this.socket = null;
}
}
}
/**
* The port of the Discord voice server
* @type {number}
* @readonly
*/
get discordPort() {
return this.voiceConnection.authentication.port;
}
/**
* Send a packet to the UDP client.
* @param {Object} packet The packet to send
* @returns {Promise<Object>}
*/
send(packet) {
return new Promise((resolve, reject) => {
if (!this.socket) throw new Error('UDP_SEND_FAIL');
if (!this.discordAddress || !this.discordPort) throw new Error('UDP_ADDRESS_MALFORMED');
this.socket.send(packet, 0, packet.length, this.discordPort, this.discordAddress, error => {
if (error) {
this.emit('debug', `[UDP] >> ERROR: ${error}`);
reject(error);
} else {
resolve(packet);
}
});
});
}
async createUDPSocket(address) {
this.discordAddress = address;
const socket = (this.socket = udp.createSocket('udp4'));
socket.on('error', e => {
this.emit('debug', `[UDP] Error: ${e}`);
this.emit('error', e);
});
socket.on('close', () => {
this.emit('debug', '[UDP] socket closed');
});
this.emit('debug', `[UDP] created socket`);
socket.once('message', message => {
this.emit('debug', `[UDP] message: [${[...message]}] (${message})`);
if (message.readUInt16BE(0) !== 2) {
throw new Error('UDP_WRONG_HANDSHAKE');
}
// Stop if the sockets have been deleted because the connection has been closed already
if (!this.voiceConnection.sockets.ws) return;
const packet = parseLocalPacket(message);
if (packet.error) {
this.emit('debug', `[UDP] ERROR: ${packet.error}`);
this.emit('error', packet.error);
return;
}
this.emit('debug', `[UDP] Parse local packet: ${packet.address}:${packet.port}`);
this.localAddress = packet.address;
this.localPort = packet.port;
this.voiceConnection.sockets.ws.sendPacket({
op: VoiceOpcodes.SELECT_PROTOCOL,
d: {
protocol: 'udp',
codecs: [
{
name: 'opus',
type: 'audio',
priority: 1000,
payload_type: 120,
},
{
name: this.voiceConnection.videoCodec,
type: 'video',
priority: 1000,
payload_type: 101,
rtx_payload_type: 102,
encode: true,
decode: true,
},
],
data: {
address: packet.address,
port: packet.port,
mode: this.voiceConnection.authentication.mode,
},
},
});
// Write = false
Object.defineProperty(this.voiceConnection, 'videoCodec', {
value: this.voiceConnection.videoCodec,
writable: false,
});
this.emit('debug', `[UDP] << ${JSON.stringify(packet)}`);
socket.on('message', buffer => this.voiceConnection.receiver.packets.push(buffer));
});
const blankMessage = Buffer.alloc(74);
blankMessage.writeUInt16BE(1, 0);
blankMessage.writeUInt16BE(70, 2);
blankMessage.writeUInt32BE(this.voiceConnection.authentication.ssrc, 4);
this.emit('debug', `Sending IP discovery packet: [${[...blankMessage]}]`);
await this.send(blankMessage);
this.emit('debug', `Successfully sent IP discovery packet`);
}
}
function parseLocalPacket(message) {
try {
const packet = Buffer.from(message);
const address = packet.subarray(8, packet.indexOf(0, 8)).toString('utf8');
if (!isIPv4(address)) {
throw new Error('UDP_ADDRESS_MALFORMED');
}
const port = packet.readUInt16BE(packet.length - 2);
return { address, port };
} catch (error) {
return { error };
}
}
module.exports = VoiceConnectionUDPClient;

View File

@@ -0,0 +1,272 @@
'use strict';
const EventEmitter = require('events');
const { setTimeout, setInterval } = require('node:timers');
const WebSocket = require('../../../WebSocket');
const { Error } = require('../../../errors');
const { Opcodes, VoiceOpcodes } = require('../../../util/Constants');
/**
* Represents a Voice Connection's WebSocket.
* @extends {EventEmitter}
* @private
*/
class VoiceWebSocket extends EventEmitter {
constructor(connection) {
super();
/**
* The Voice Connection that this WebSocket serves
* @type {VoiceConnection}
*/
this.connection = connection;
/**
* How many connection attempts have been made
* @type {number}
*/
this.attempts = 0;
this.dead = false;
this.connection.on('closing', this.shutdown.bind(this));
}
/**
* The client of this voice WebSocket
* @type {Client}
* @readonly
*/
get client() {
return this.connection.client;
}
shutdown() {
this.emit('debug', `[WS] shutdown requested`);
this.dead = true;
this.reset();
}
/**
* Resets the current WebSocket.
*/
reset() {
this.emit('debug', `[WS] reset requested`);
if (this.ws) {
if (this.ws.readyState !== WebSocket.CLOSED) this.ws.close();
this.ws = null;
}
this.clearHeartbeat();
}
/**
* Starts connecting to the Voice WebSocket Server.
*/
connect() {
this.emit('debug', `[WS] connect requested`);
if (this.dead) return;
if (this.ws) this.reset();
if (this.attempts >= 5) {
this.emit('debug', new Error('VOICE_CONNECTION_ATTEMPTS_EXCEEDED', this.attempts));
return;
}
this.attempts++;
/**
* The actual WebSocket used to connect to the Voice WebSocket Server.
* @type {WebSocket}
*/
this.ws = WebSocket.create(`wss://${this.connection.authentication.endpoint}/`, { v: 7 });
this.emit('debug', `[WS] connecting, ${this.attempts} attempts, ${this.ws.url}`);
this.ws.onopen = this.onOpen.bind(this);
this.ws.onmessage = this.onMessage.bind(this);
this.ws.onclose = this.onClose.bind(this);
this.ws.onerror = this.onError.bind(this);
}
/**
* Sends data to the WebSocket if it is open.
* @param {string} data The data to send to the WebSocket
* @returns {Promise<string>}
*/
send(data) {
this.emit('debug', `[WS] >> ${data}`);
return new Promise((resolve, reject) => {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) throw new Error('WS_NOT_OPEN', data);
this.ws.send(data, null, error => {
if (error) reject(error);
else resolve(data);
});
});
}
/**
* JSON.stringify's a packet and then sends it to the WebSocket Server.
* @param {Object} packet The packet to send
* @returns {Promise<string>}
*/
sendPacket(packet) {
try {
packet = JSON.stringify(packet);
} catch (error) {
return Promise.reject(error);
}
return this.send(packet);
}
/**
* Called whenever the WebSocket opens.
*/
onOpen() {
this.emit('debug', `[WS] opened at gateway ${this.connection.authentication.endpoint}`);
this.sendPacket({
op: Opcodes.DISPATCH,
d: {
server_id: this.connection.serverId || this.connection.channel.guild?.id || this.connection.channel.id,
user_id: this.client.user.id,
token: this.connection.authentication.token,
session_id: this.connection.authentication.sessionId,
streams: [{ type: 'screen', rid: '100', quality: 100 }],
video: true,
},
}).catch(() => {
this.emit('error', new Error('VOICE_JOIN_SOCKET_CLOSED'));
});
}
/**
* Called whenever a message is received from the WebSocket.
* @param {MessageEvent} event The message event that was received
* @returns {void}
*/
onMessage(event) {
try {
return this.onPacket(WebSocket.unpack(event.data, 'json'));
} catch (error) {
return this.onError(error);
}
}
/**
* Called whenever the connection to the WebSocket server is lost.
*/
onClose() {
this.emit('debug', `[WS] closed`);
if (!this.dead) setTimeout(this.connect.bind(this), this.attempts * 1000).unref();
}
/**
* Called whenever an error occurs with the WebSocket.
* @param {Error} error The error that occurred
*/
onError(error) {
this.emit('debug', `[WS] Error: ${error}`);
this.emit('error', error);
}
/**
* Called whenever a valid packet is received from the WebSocket.
* @param {Object} packet The received packet
*/
onPacket(packet) {
this.emit('debug', `[WS] << ${JSON.stringify(packet)}`);
switch (packet.op) {
case VoiceOpcodes.HELLO:
this.setHeartbeat(packet.d.heartbeat_interval);
break;
case VoiceOpcodes.READY:
/**
* Emitted once the voice WebSocket receives the ready packet.
* @param {Object} packet The received packet
* @event VoiceWebSocket#ready
*/
this.emit('ready', packet.d);
this.connection.setVideoStatus(false);
break;
/* eslint-disable no-case-declarations */
case VoiceOpcodes.SESSION_DESCRIPTION:
packet.d.secret_key = new Uint8Array(packet.d.secret_key);
/**
* Emitted once the Voice Websocket receives a description of this voice session.
* @param {Object} packet The received packet
* @event VoiceWebSocket#sessionDescription
*/
this.emit('sessionDescription', packet.d);
break;
case VoiceOpcodes.CLIENT_CONNECT:
this.connection.ssrcMap.set(+packet.d.audio_ssrc, {
userId: packet.d.user_id,
speaking: 0,
hasVideo: Boolean(packet.d.video_ssrc),
});
break;
case VoiceOpcodes.CLIENT_DISCONNECT:
const streamInfo = this.connection.receiver && this.connection.receiver.packets.streams.get(packet.d.user_id);
if (streamInfo) {
this.connection.receiver.packets.streams.delete(packet.d.user_id);
streamInfo.stream.push(null);
}
break;
case VoiceOpcodes.SPEAKING:
/**
* Emitted whenever a speaking packet is received.
* @param {Object} data
* @event VoiceWebSocket#startSpeaking
*/
this.emit('startSpeaking', packet.d);
break;
default:
/**
* Emitted when an unhandled packet is received.
* @param {Object} packet
* @event VoiceWebSocket#unknownPacket
*/
this.emit('unknownPacket', packet);
break;
}
}
/**
* Sets an interval at which to send a heartbeat packet to the WebSocket.
* @param {number} interval The interval at which to send a heartbeat packet
*/
setHeartbeat(interval) {
if (!interval || isNaN(interval)) {
this.onError(new Error('VOICE_INVALID_HEARTBEAT'));
return;
}
if (this.heartbeatInterval) {
/**
* Emitted whenever the voice WebSocket encounters a non-fatal error.
* @param {string} warn The warning
* @event VoiceWebSocket#warn
*/
this.emit('warn', 'A voice heartbeat interval is being overwritten');
clearInterval(this.heartbeatInterval);
}
this.heartbeatInterval = setInterval(this.sendHeartbeat.bind(this), interval).unref();
}
/**
* Clears a heartbeat interval, if one exists.
*/
clearHeartbeat() {
if (!this.heartbeatInterval) {
this.emit('warn', 'Tried to clear a heartbeat interval that does not exist');
return;
}
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
/**
* Sends a heartbeat packet.
*/
sendHeartbeat() {
this.sendPacket({ op: VoiceOpcodes.HEARTBEAT, d: Math.floor(Math.random() * 10e10) }).catch(() => {
this.emit('warn', 'Tried to send heartbeat, but connection is not open');
this.clearHeartbeat();
});
}
}
module.exports = VoiceWebSocket;

View File

@@ -0,0 +1,272 @@
'use strict';
/*
Credit: https://github.com/dank074/Discord-video-stream
The use of video streaming in this library is an incomplete implementation with many bugs, primarily aimed at lazy users.
The video streaming features in this library are sourced from https://github.com/dank074/Discord-video-stream.
Please use the @dank074/discord-video-stream library to access all advanced and professional features,
along with comprehensive support. I will not actively fix bugs related to streaming and encourage everyone to
use https://github.com/dank074/Discord-video-stream for stable and smooth streaming.
To reiterate: This is an incomplete implementation of the library https://github.com/dank074/Discord-video-stream.
Thanks to dank074 and longnguyen2004 for implementing new codecs (H264, H265).
Thanks to mrjvs for discovering how Discord transmits data and the VP8 codec.
Please use the @dank074/discord-video-stream library for the best support.
*/
const EventEmitter = require('events');
const { Readable: ReadableStream } = require('stream');
const prism = require('prism-media');
const { H264NalSplitter } = require('./processing/AnnexBNalSplitter');
const { IvfTransformer } = require('./processing/IvfSplitter');
const { H264Dispatcher } = require('../dispatcher/AnnexBDispatcher');
const AudioDispatcher = require('../dispatcher/AudioDispatcher');
const { VP8Dispatcher } = require('../dispatcher/VPxDispatcher');
const FFMPEG_ARGUMENTS = ['-analyzeduration', '0', '-loglevel', '0', '-f', 's16le', '-ar', '48000', '-ac', '2'];
/**
* Player for a Voice Connection.
* @private
* @extends {EventEmitter}
*/
class MediaPlayer extends EventEmitter {
constructor(voiceConnection) {
super();
this.dispatcher = null;
this.videoDispatcher = null;
/**
* The voice connection that the player serves
* @type {VoiceConnection}
*/
this.voiceConnection = voiceConnection;
}
destroy() {
this.destroyDispatcher();
this.destroyVideoDispatcher();
}
destroyDispatcher() {
if (this.dispatcher) {
this.dispatcher.destroy();
this.dispatcher = null;
}
}
destroyVideoDispatcher() {
if (this.videoDispatcher) {
this.videoDispatcher.destroy();
this.videoDispatcher = null;
}
}
playUnknown(input, options, streams = {}) {
this.destroyDispatcher();
const isStream = input instanceof ReadableStream;
const args = isStream ? FFMPEG_ARGUMENTS.slice() : ['-i', input, ...FFMPEG_ARGUMENTS];
if (options.seek) args.unshift('-ss', String(options.seek));
const ffmpeg = new prism.FFmpeg({ args });
streams.ffmpeg = ffmpeg;
if (isStream) {
streams.input = input;
input.pipe(ffmpeg);
}
return this.playPCMStream(ffmpeg, options, streams);
}
playPCMStream(stream, options, streams = {}) {
this.destroyDispatcher();
const opus = (streams.opus = new prism.opus.Encoder({ channels: 2, rate: 48000, frameSize: 960 }));
if (options && options.volume === false) {
stream.pipe(opus);
return this.playOpusStream(opus, options, streams);
}
streams.volume = new prism.VolumeTransformer({ type: 's16le', volume: options ? options.volume : 1 });
stream.pipe(streams.volume).pipe(opus);
return this.playOpusStream(opus, options, streams);
}
playOpusStream(stream, options, streams = {}) {
this.destroyDispatcher();
streams.opus = stream;
if (options.volume !== false && !streams.input) {
streams.input = stream;
const decoder = new prism.opus.Decoder({ channels: 2, rate: 48000, frameSize: 960 });
streams.volume = new prism.VolumeTransformer({ type: 's16le', volume: options ? options.volume : 1 });
streams.opus = stream
.pipe(decoder)
.pipe(streams.volume)
.pipe(new prism.opus.Encoder({ channels: 2, rate: 48000, frameSize: 960 }));
}
const dispatcher = this.createDispatcher(options, streams);
streams.opus.pipe(dispatcher);
return dispatcher;
}
playUnknownVideo(input, options = {}) {
this.destroyVideoDispatcher();
const isStream = input instanceof ReadableStream;
if (!options?.fps) options.fps = 30;
const args = [
'-i',
'-',
'-analyzeduration',
'0',
'-flags',
'low_delay',
'-quality',
'realtime',
'-r',
`${options?.fps}`,
];
if (!isStream) {
args[1] = input;
}
if (options?.hwAccel === true) {
args.unshift('-hwaccel', 'auto');
}
if (options.seek) args.unshift('-ss', String(options.seek));
// Get stream type
if (this.voiceConnection.videoCodec == 'VP8') {
args.push('-f', 'ivf', '-deadline', 'realtime', '-c:v', options?.copy ? 'copy' : 'libvpx', '-speed', '5');
}
if (this.voiceConnection.videoCodec == 'H264') {
args.push(
'-c:v',
options?.copy ? 'copy' : 'libx264',
'-f',
'h264',
'-tune',
'zerolatency',
'-pix_fmt',
'yuv420p',
'-preset',
options?.preset || 'faster',
'-profile:v',
'baseline',
'-g',
`${options?.fps}`,
'-x264-params',
`keyint=${options?.fps}:min-keyint=${options?.fps}`,
'-bf',
'0',
'-bsf:v',
'h264_metadata=aud=insert',
);
}
if (options?.inputFFmpegArgs) {
args.unshift(...options.inputFFmpegArgs);
}
if (options?.outputFFmpegArgs) {
args.push(...options.outputFFmpegArgs);
}
const ffmpeg = new prism.FFmpeg({ args });
const streams = { ffmpeg };
if (isStream) {
streams.input = input;
input.pipe(ffmpeg);
}
this.emit('debug', `[ffmpeg] Spawn process with args:\n${args.join(' ')}`);
ffmpeg.process.stderr.on('data', data => {
this.emit('debug', `[ffmpeg]: ${data.toString()}`);
});
switch (this.voiceConnection.videoCodec) {
case 'VP8': {
return this.playIvfVideo(ffmpeg, options, streams);
}
case 'H264': {
return this.playAnnexBVideo(ffmpeg, options, streams, 'H264');
}
default: {
throw new Error('Invalid codec');
}
}
}
playIvfVideo(stream, options, streams) {
this.destroyVideoDispatcher();
const videoStream = new IvfTransformer();
stream.pipe(videoStream);
streams.video = videoStream;
const dispatcher = this.createVideoDispatcher(options, streams);
videoStream.pipe(dispatcher);
return dispatcher;
}
// eslint-disable-next-line no-unused-vars
playAnnexBVideo(stream, options, streams, type) {
this.destroyVideoDispatcher();
const videoStream = new H264NalSplitter();
stream.pipe(videoStream);
streams.video = videoStream;
const dispatcher = this.createVideoDispatcher(options, streams);
videoStream.pipe(dispatcher);
return dispatcher;
}
createDispatcher(options, streams) {
this.destroyDispatcher();
const dispatcher = (this.dispatcher = new AudioDispatcher(this, options, streams));
return dispatcher;
}
/**
* Create
* @private
* @param {Object} options any
* @param {Object} streams any
* @returns {VideoDispatcher}
*/
createVideoDispatcher(options, streams) {
this.destroyVideoDispatcher();
switch (this.voiceConnection.videoCodec) {
case 'VP8': {
const dispatcher = (this.videoDispatcher = new VP8Dispatcher(
this,
options?.highWaterMark,
streams,
options?.fps,
));
return dispatcher;
}
case 'H264': {
const dispatcher = (this.videoDispatcher = new H264Dispatcher(
this,
options?.highWaterMark,
streams,
options?.fps,
));
return dispatcher;
}
default: {
throw new Error('Invalid codec');
}
}
}
}
module.exports = MediaPlayer;

View File

@@ -0,0 +1,244 @@
'use strict';
/*
Credit: https://github.com/dank074/Discord-video-stream
The use of video streaming in this library is an incomplete implementation with many bugs, primarily aimed at lazy users.
The video streaming features in this library are sourced from https://github.com/dank074/Discord-video-stream.
Please use the @dank074/discord-video-stream library to access all advanced and professional features,
along with comprehensive support. I will not actively fix bugs related to streaming and encourage everyone to
use https://github.com/dank074/Discord-video-stream for stable and smooth streaming.
To reiterate: This is an incomplete implementation of the library https://github.com/dank074/Discord-video-stream.
Thanks to dank074 and longnguyen2004 for implementing new codecs (H264, H265).
Thanks to mrjvs for discovering how Discord transmits data and the VP8 codec.
Please use the @dank074/discord-video-stream library for the best support.
*/
const { Buffer } = require('buffer');
const { Transform } = require('stream');
const H264NalUnitTypes = {
Unspecified: 0,
CodedSliceNonIDR: 1,
CodedSlicePartitionA: 2,
CodedSlicePartitionB: 3,
CodedSlicePartitionC: 4,
CodedSliceIdr: 5,
SEI: 6,
SPS: 7,
PPS: 8,
AccessUnitDelimiter: 9,
EndOfSequence: 10,
EndOfStream: 11,
FillerData: 12,
SEIExtenstion: 13,
PrefixNalUnit: 14,
SubsetSPS: 15,
};
const H265NalUnitTypes = {
TRAIL_N: 0,
TRAIL_R: 1,
TSA_N: 2,
TSA_R: 3,
STSA_N: 4,
STSA_R: 5,
RADL_N: 6,
RADL_R: 7,
RASL_N: 8,
RASL_R: 9,
RSV_VCL_N10: 10,
RSV_VCL_R11: 11,
RSV_VCL_N12: 12,
RSV_VCL_R13: 13,
RSV_VCL_N14: 14,
RSV_VCL_R15: 15,
BLA_W_LP: 16,
BLA_W_RADL: 17,
BLA_N_LP: 18,
IDR_W_RADL: 19,
IDR_N_LP: 20,
CRA_NUT: 21,
RSV_IRAP_VCL22: 22,
RSV_IRAP_VCL23: 23,
RSV_VCL24: 24,
RSV_VCL25: 25,
RSV_VCL26: 26,
RSV_VCL27: 27,
RSV_VCL28: 28,
RSV_VCL29: 29,
RSV_VCL30: 30,
RSV_VCL31: 31,
VPS_NUT: 32,
SPS_NUT: 33,
PPS_NUT: 34,
AUD_NUT: 35,
EOS_NUT: 36,
EOB_NUT: 37,
FD_NUT: 38,
PREFIX_SEI_NUT: 39,
SUFFIX_SEI_NUT: 40,
RSV_NVCL41: 41,
RSV_NVCL42: 42,
RSV_NVCL43: 43,
RSV_NVCL44: 44,
RSV_NVCL45: 45,
RSV_NVCL46: 46,
RSV_NVCL47: 47,
UNSPEC48: 48,
UNSPEC49: 49,
UNSPEC50: 50,
UNSPEC51: 51,
UNSPEC52: 52,
UNSPEC53: 53,
UNSPEC54: 54,
UNSPEC55: 55,
UNSPEC56: 56,
UNSPEC57: 57,
UNSPEC58: 58,
UNSPEC59: 59,
UNSPEC60: 60,
UNSPEC61: 61,
UNSPEC62: 62,
UNSPEC63: 63,
};
const H264Helpers = {
getUnitType(frame) {
return frame[0] & 0x1f;
},
splitHeader(frame) {
return [frame.subarray(0, 1), frame.subarray(1)];
},
isAUD(unitType) {
return unitType === H264NalUnitTypes.AccessUnitDelimiter;
},
};
const H265Helpers = {
getUnitType(frame) {
return (frame[0] >> 1) & 0x3f;
},
splitHeader(frame) {
return [frame.subarray(0, 2), frame.subarray(2)];
},
isAUD(unitType) {
return unitType === H265NalUnitTypes.AUD_NUT;
},
};
const emptyBuffer = Buffer.allocUnsafe(0);
const epbPrefix = Buffer.from([0x00, 0x00, 0x03]);
const nalSuffix = Buffer.from([0x00, 0x00, 0x01]);
class AnnexBNalSplitter extends Transform {
constructor(nalFunctions) {
super();
this._buffer = null;
this._accessUnit = [];
this._nalFunctions = nalFunctions;
}
rbsp(data) {
const newData = Buffer.allocUnsafe(data.length);
let newLength = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
const epbsPos = data.indexOf(epbPrefix);
if (epbsPos === -1) {
data.copy(newData, newLength);
newLength += data.length;
break;
}
const copyRange = data[epbsPos + 3] <= 0x03 ? epbsPos + 2 : epbsPos + 3;
data.copy(newData, newLength, 0, copyRange);
newLength += copyRange;
data = data.subarray(epbsPos + 3);
}
return newData.subarray(0, newLength);
}
findNalStart(buf) {
const pos = buf.indexOf(nalSuffix);
if (pos === -1) return null;
return pos > 0 && buf[pos - 1] === 0 ? { index: pos - 1, length: 4 } : { index: pos, length: 3 };
}
processFrame(frame) {
if (frame.length === 0) return;
const unitType = this._nalFunctions.getUnitType(frame);
if (this._nalFunctions.isAUD(unitType) && this._accessUnit.length > 0) {
const sizeOfAccessUnit = this._accessUnit.reduce((acc, nalu) => acc + nalu.length + 4, 0);
const accessUnitBuf = Buffer.allocUnsafe(sizeOfAccessUnit);
let offset = 0;
this._accessUnit.forEach(nalu => {
accessUnitBuf.writeUint32BE(nalu.length, offset);
offset += 4;
nalu.copy(accessUnitBuf, offset);
offset += nalu.length;
});
this.push(accessUnitBuf);
this._accessUnit = [];
} else {
this._accessUnit.push(this.removeEpbs(frame, unitType));
}
}
_transform(chunk, encoding, callback) {
let nalStart = this.findNalStart(chunk);
if (!this._buffer) {
if (!nalStart) {
callback();
return;
}
chunk = chunk.subarray(nalStart.index + nalStart.length);
this._buffer = emptyBuffer;
}
chunk = Buffer.concat([this._buffer, chunk]);
while ((nalStart = this.findNalStart(chunk))) {
const frame = chunk.subarray(0, nalStart.index);
this.processFrame(frame);
chunk = chunk.subarray(nalStart.index + nalStart.length);
}
this._buffer = chunk;
callback();
}
}
class H264NalSplitter extends AnnexBNalSplitter {
constructor() {
super(H264Helpers);
}
removeEpbs(frame, unitType) {
return unitType === H264NalUnitTypes.SPS || unitType === H264NalUnitTypes.SEI ? this.rbsp(frame) : frame;
}
}
class H265NalSplitter extends AnnexBNalSplitter {
constructor() {
super(H265Helpers);
}
removeEpbs(frame) {
return frame; // No specific processing for H265
}
}
module.exports = {
H264NalUnitTypes,
H265NalUnitTypes,
H264Helpers,
H265Helpers,
H264NalSplitter,
H265NalSplitter,
};

View File

@@ -0,0 +1,106 @@
'use strict';
/*
Credit: https://github.com/dank074/Discord-video-stream
The use of video streaming in this library is an incomplete implementation with many bugs, primarily aimed at lazy users.
The video streaming features in this library are sourced from https://github.com/dank074/Discord-video-stream.
Please use the @dank074/discord-video-stream library to access all advanced and professional features,
along with comprehensive support. I will not actively fix bugs related to streaming and encourage everyone to
use https://github.com/dank074/Discord-video-stream for stable and smooth streaming.
To reiterate: This is an incomplete implementation of the library https://github.com/dank074/Discord-video-stream.
Thanks to dank074 and longnguyen2004 for implementing new codecs (H264, H265).
Thanks to mrjvs for discovering how Discord transmits data and the VP8 codec.
Please use the @dank074/discord-video-stream library for the best support.
*/
const { Buffer } = require('buffer');
const { Transform } = require('stream');
class IvfTransformer extends Transform {
constructor(options) {
super(options);
this.headerSize = 32;
this.frameHeaderSize = 12;
this.header = null;
this.buf = null;
this.retFullFrame = options && options.fullframe ? options.fullframe : false;
}
_parseHeader(header) {
this.header = {
signature: header.subarray(0, 4).toString(),
version: header.readUIntLE(4, 2),
headerLength: header.readUIntLE(6, 2),
codec: header.subarray(8, 12).toString(),
width: header.readUIntLE(12, 2),
height: header.readUIntLE(14, 2),
timeDenominator: header.readUIntLE(16, 4),
timeNumerator: header.readUIntLE(20, 4),
frameCount: header.readUIntLE(24, 4),
};
}
_getFrameSize(buf) {
return buf.readUIntLE(0, 4);
}
_parseFrame(frame) {
const size = this._getFrameSize(frame);
if (this.retFullFrame) return this.push(frame.subarray(0, 12 + size));
const out = {
size: size,
timestamp: frame.readBigUInt64LE(4),
data: frame.subarray(12, 12 + size),
};
return this.push(out.data);
}
_appendChunkToBuf(chunk) {
if (this.buf) this.buf = Buffer.concat([this.buf, chunk]);
else this.buf = chunk;
}
_updateBufLen(size) {
if (this.buf.length > size) this.buf = this.buf.subarray(size, this.buf.length);
else this.buf = null;
}
_transform(chunk, encoding, callback) {
this._appendChunkToBuf(chunk);
if (!this.header) {
if (this.buf.length >= this.headerSize) {
this._parseHeader(this.buf.subarray(0, this.headerSize));
this._updateBufLen(this.headerSize);
} else {
callback();
return;
}
}
while (this.buf && this.buf.length >= this.frameHeaderSize) {
const size = this._getFrameSize(this.buf) + this.frameHeaderSize;
if (this.buf.length >= size) {
this._parseFrame(this.buf.subarray(0, size));
this._updateBufLen(size);
} else {
break;
}
}
callback();
}
}
module.exports = {
IvfTransformer,
};

View File

@@ -0,0 +1,136 @@
'use strict';
const EventEmitter = require('events');
const { Buffer } = require('node:buffer');
const { setTimeout } = require('node:timers');
const Speaking = require('../../../util/Speaking');
const secretbox = require('../util/Secretbox');
const { SILENCE_FRAME } = require('../util/Silence');
// The delay between packets when a user is considered to have stopped speaking
// https://github.com/discordjs/discord.js/issues/3524#issuecomment-540373200
const DISCORD_SPEAKING_DELAY = 250;
class Readable extends require('stream').Readable {
_read() {} // eslint-disable-line no-empty-function
}
class PacketHandler extends EventEmitter {
constructor(receiver) {
super();
this.nonce = Buffer.alloc(24);
this.receiver = receiver;
this.streams = new Map();
this.speakingTimeouts = new Map();
}
get connection() {
return this.receiver.connection;
}
_stoppedSpeaking(userId) {
const streamInfo = this.streams.get(userId);
if (streamInfo && streamInfo.end === 'silence') {
this.streams.delete(userId);
streamInfo.stream.push(null);
}
}
makeStream(user, end) {
if (this.streams.has(user)) return this.streams.get(user).stream;
const stream = new Readable();
stream.on('end', () => this.streams.delete(user));
this.streams.set(user, { stream, end });
return stream;
}
parseBuffer(buffer) {
const { secret_key, mode } = this.receiver.connection.authentication;
// Choose correct nonce depending on encryption
let end;
if (mode === 'xsalsa20_poly1305_lite') {
buffer.copy(this.nonce, 0, buffer.length - 4);
end = buffer.length - 4;
} else if (mode === 'xsalsa20_poly1305_suffix') {
buffer.copy(this.nonce, 0, buffer.length - 24);
end = buffer.length - 24;
} else {
buffer.copy(this.nonce, 0, 0, 12);
}
// Open packet
let packet = secretbox.methods.open(buffer.slice(12, end), this.nonce, secret_key);
if (!packet) return new Error('Failed to decrypt voice packet');
packet = Buffer.from(packet);
// Strip RTP Header Extensions (one-byte only)
if (packet[0] === 0xbe && packet[1] === 0xde) {
const headerExtensionLength = packet.readUInt16BE(2);
packet = packet.subarray(4 + 4 * headerExtensionLength);
}
return packet;
}
push(buffer) {
const ssrc = buffer.readUInt32BE(8);
const userStat = this.connection.ssrcMap.get(ssrc);
if (!userStat) return;
let opusPacket;
const streamInfo = this.streams.get(userStat.userId);
// If the user is in video, we need to check if the packet is just silence
if (userStat.hasVideo) {
opusPacket = this.parseBuffer(buffer);
if (opusPacket instanceof Error) {
// Only emit an error if we were actively receiving packets from this user
if (streamInfo) {
this.emit('error', opusPacket);
return;
}
}
if (SILENCE_FRAME.equals(opusPacket)) {
// If this is a silence frame, pretend we never received it
return;
}
}
let speakingTimeout = this.speakingTimeouts.get(ssrc);
if (typeof speakingTimeout === 'undefined') {
// Ensure at least the speaking bit is set.
// As the object is by reference, it's only needed once per client re-connect.
if (userStat.speaking === 0) {
userStat.speaking = Speaking.FLAGS.SPEAKING;
}
this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: userStat.speaking });
speakingTimeout = setTimeout(() => {
try {
this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: 0 });
clearTimeout(speakingTimeout);
this.speakingTimeouts.delete(ssrc);
} catch {
// Connection already closed, ignore
}
}, DISCORD_SPEAKING_DELAY).unref();
this.speakingTimeouts.set(ssrc, speakingTimeout);
} else {
speakingTimeout.refresh();
}
if (streamInfo) {
const { stream } = streamInfo;
if (!opusPacket) {
opusPacket = this.parseBuffer(buffer);
if (opusPacket instanceof Error) {
this.emit('error', opusPacket);
return;
}
}
stream.push(opusPacket);
}
}
}
module.exports = PacketHandler;

View File

@@ -0,0 +1,58 @@
'use strict';
const EventEmitter = require('events');
const prism = require('prism-media');
const PacketHandler = require('./PacketHandler');
const { Error } = require('../../../errors');
/**
* Receives audio packets from a voice connection.
* @example
* const receiver = connection.createReceiver();
* // opusStream is a ReadableStream - that means you could play it back to a voice channel if you wanted to!
* const opusStream = receiver.createStream(user);
*/
class VoiceReceiver extends EventEmitter {
constructor(connection) {
super();
this.connection = connection;
this.packets = new PacketHandler(this);
/**
* Emitted whenever there is a warning
* @event VoiceReceiver#debug
* @param {Error|string} error The error or message to debug
*/
this.packets.on('error', err => this.emit('debug', err));
}
/**
* Options passed to `VoiceReceiver#createStream`.
* @typedef {Object} ReceiveStreamOptions
* @property {string} [mode='opus'] The mode for audio output. This defaults to opus, meaning discord.js won't decode
* the packets for you. You can set this to 'pcm' so that the stream's output will be 16-bit little-endian stereo
* audio
* @property {string} [end='silence'] When the stream should be destroyed. If `silence`, this will be when the user
* stops talking. Otherwise, if `manual`, this should be handled by you.
*/
/**
* Creates a new audio receiving stream. If a stream already exists for a user, then that stream will be returned
* rather than generating a new one.
* @param {UserResolvable} user The user to start listening to.
* @param {ReceiveStreamOptions} options Options.
* @returns {ReadableStream}
*/
createStream(user, { mode = 'opus', end = 'silence' } = {}) {
user = this.connection.client.users.resolve(user);
if (!user) throw new Error('VOICE_USER_MISSING');
const stream = this.packets.makeStream(user.id, end);
if (mode === 'pcm') {
const decoder = new prism.opus.Decoder({ channels: 2, rate: 48000, frameSize: 960 });
stream.pipe(decoder);
return decoder;
}
return stream;
}
}
module.exports = VoiceReceiver;

View File

@@ -0,0 +1,14 @@
'use strict';
function parseStreamKey(key) {
const Arr = key.split(':');
const type = Arr[0];
const guildId = type == 'guild' ? Arr[1] : null;
const channelId = type == 'guild' ? Arr[2] : Arr[1];
const userId = type == 'guild' ? Arr[3] : Arr[2];
return { type, guildId, channelId, userId };
}
module.exports = {
parseStreamKey,
};

View File

@@ -0,0 +1,121 @@
'use strict';
const { Readable } = require('stream');
const prism = require('prism-media');
const { Error } = require('../../../errors');
/**
* Options that can be passed to stream-playing methods:
* @typedef {Object} StreamOptions
* @property {StreamType} [type='unknown'] The type of stream.
* @property {number} [seek=0] The time to seek to, will be ignored when playing `ogg/opus` or `webm/opus` streams
* @property {number|boolean} [volume=1] The volume to play at. Set this to false to disable volume transforms for
* this stream to improve performance.
* @property {number} [plp] Expected packet loss percentage
* @property {boolean} [fec] Enabled forward error correction
* @property {number|string} [bitrate=96] The bitrate (quality) of the audio in kbps.
* If set to 'auto', the voice channel's bitrate will be used
* @property {number} [highWaterMark=12] The maximum number of opus packets to make and store before they are
* actually needed. See https://nodejs.org/en/docs/guides/backpressuring-in-streams/. Setting this value to
* 1 means that changes in volume will be more instant.
*/
/**
* An option passed as part of `StreamOptions` specifying the type of the stream.
* * `unknown`: The default type, streams/input will be passed through to ffmpeg before encoding.
* Will play most streams.
* * `converted`: Play a stream of 16bit signed stereo PCM data, skipping ffmpeg.
* * `opus`: Play a stream of opus packets, skipping ffmpeg. You lose the ability to alter volume.
* * `ogg/opus`: Play an ogg file with the opus encoding, skipping ffmpeg. You lose the ability to alter volume.
* * `webm/opus`: Play a webm file with opus audio, skipping ffmpeg. You lose the ability to alter volume.
* @typedef {string} StreamType
*/
/**
* An interface class to allow you to play audio over VoiceConnections.
*/
class PlayInterface {
constructor(player) {
this.player = player;
}
/**
* Play an audio resource.
* @param {ReadableStream|string} resource The resource to play.
* @param {StreamOptions} [options] The options to play.
* @example
* // Play a local audio file
* connection.playAudio('/home/hydrabolt/audio.mp3', { volume: 0.5 });
* @example
* // Play a ReadableStream
* connection.playAudio(ytdl('https://www.youtube.com/watch?v=ZlAU_w7-Xp8', { quality: 'highestaudio' }));
* @example
* // Using different protocols: https://ffmpeg.org/ffmpeg-protocols.html
* connection.playAudio('http://www.sample-videos.com/audio/mp3/wave.mp3');
* @returns {AudioDispatcher}
*/
playAudio(resource, options = {}) {
if (resource instanceof Readable || typeof resource === 'string') {
const type = options.type || 'unknown';
if (type === 'unknown') {
return this.player.playUnknown(resource, options);
} else if (type === 'converted') {
return this.player.playPCMStream(resource, options);
} else if (type === 'opus') {
return this.player.playOpusStream(resource, options);
} else if (type === 'ogg/opus') {
if (!(resource instanceof Readable)) throw new Error('VOICE_PRISM_DEMUXERS_NEED_STREAM');
return this.player.playOpusStream(resource.pipe(new prism.opus.OggDemuxer()), options);
} else if (type === 'webm/opus') {
if (!(resource instanceof Readable)) throw new Error('VOICE_PRISM_DEMUXERS_NEED_STREAM');
return this.player.playOpusStream(resource.pipe(new prism.opus.WebmDemuxer()), options);
}
}
throw new Error('VOICE_PLAY_INTERFACE_BAD_TYPE');
}
/**
* Options that can be passed to stream-playing methods:
* @typedef {Object} VideoOptions
* @property {number} [seek=0] The time to seek to, will be ignored when playing `ogg/opus` or `webm/opus` streams
* @property {number} [fps=30] Video fps
* @property {boolean} [copy=false] Copy codec ?
* @property {number} [highWaterMark=12] The maximum number of opus packets to make and store before they are
* actually needed. See https://nodejs.org/en/docs/guides/backpressuring-in-streams/. Setting this value to
* 1 means that changes in volume will be more instant.
* @property {'ultrafast' | 'superfast' | 'veryfast' | 'faster' | 'fast' | 'medium' | 'slow' | 'slower' | 'veryslow'} [preset='veryfast'] ffmpeg preset
* @property {boolean} [hwAccel=false] Enables hardware accelerated video decoding. Enabling this option might result in an exception
* being thrown by Ffmpeg process if your system does not support hardware acceleration
* @property {string[]} [inputFFmpegArgs] input ffmpeg
* Ex: ['-config1', 'value1', '-config2', 'value2']
* @property {string[]} [outputFFmpegArgs] output ffmpeg
* Ex: ['-config1', 'value1', '-config2', 'value2']
*/
/**
* Play an video resource.
* @param {ReadableStream|string} resource The resource to play.
* @param {VideoOptions} [options] The options to play.
* @example
* // Play a local video file
* connection.playVideo('/home/hydrabolt/video.mp4');
* @example
* // Using different protocols: https://ffmpeg.org/ffmpeg-protocols.html
* connection.playVideo('http://www.sample-videos.com/video/mp4/wave.mp4');
* @returns {VideoDispatcher}
*/
playVideo(resource, options = {}) {
if (resource instanceof Readable || typeof resource === 'string') {
return this.player.playUnknownVideo(resource, options);
}
throw new Error('VOICE_PLAY_INTERFACE_BAD_TYPE');
}
static applyToClass(structure) {
for (const prop of ['playAudio', 'playVideo']) {
Object.defineProperty(structure.prototype, prop, Object.getOwnPropertyDescriptor(PlayInterface.prototype, prop));
}
}
}
module.exports = PlayInterface;

View File

@@ -0,0 +1,32 @@
'use strict';
const libs = {
sodium: sodium => ({
open: sodium.api.crypto_secretbox_open_easy,
close: sodium.api.crypto_secretbox_easy,
random: n => sodium.randombytes_buf(n),
}),
'libsodium-wrappers': sodium => ({
open: sodium.crypto_secretbox_open_easy,
close: sodium.crypto_secretbox_easy,
random: n => sodium.randombytes_buf(n),
}),
tweetnacl: tweetnacl => ({
open: tweetnacl.secretbox.open,
close: tweetnacl.secretbox,
random: n => tweetnacl.randomBytes(n),
}),
};
exports.methods = {};
(async () => {
for (const libName of Object.keys(libs)) {
try {
const lib = require(libName);
if (libName === 'libsodium-wrappers' && lib.ready) await lib.ready; // eslint-disable-line no-await-in-loop
exports.methods = libs[libName](lib);
break;
} catch {} // eslint-disable-line no-empty
}
})();

View File

@@ -0,0 +1,16 @@
'use strict';
const { Buffer } = require('node:buffer');
const { Readable } = require('stream');
const SILENCE_FRAME = Buffer.from([0xf8, 0xff, 0xfe]);
class Silence extends Readable {
_read() {
this.push(SILENCE_FRAME);
}
}
Silence.SILENCE_FRAME = SILENCE_FRAME;
module.exports = Silence;

View File

@@ -0,0 +1,63 @@
'use strict';
/*
Credit: https://github.com/dank074/Discord-video-stream
The use of video streaming in this library is an incomplete implementation with many bugs, primarily aimed at lazy users.
The video streaming features in this library are sourced from https://github.com/dank074/Discord-video-stream.
Please use the @dank074/discord-video-stream library to access all advanced and professional features,
along with comprehensive support. I will not actively fix bugs related to streaming and encourage everyone to
use https://github.com/dank074/Discord-video-stream for stable and smooth streaming.
To reiterate: This is an incomplete implementation of the library https://github.com/dank074/Discord-video-stream.
Thanks to dank074 and longnguyen2004 for implementing new codecs (H264, H265).
Thanks to mrjvs for discovering how Discord transmits data and the VP8 codec.
Please use the @dank074/discord-video-stream library for the best support.
*/
const fs = require('fs');
const net = require('net');
const path = require('path');
const process = require('process');
let counter = 0;
class UnixStream {
constructor(stream, onSocket) {
if (process.platform === 'win32') {
const pipePrefix = '\\\\.\\pipe\\';
const pipeName = `node-webrtc.${++counter}.sock`;
this.socketPath = path.join(pipePrefix, pipeName);
this.url = this.socketPath;
} else {
this.socketPath = `./${++counter}.sock`;
this.url = `unix:${this.socketPath}`;
}
try {
fs.statSync(this.socketPath);
fs.unlinkSync(this.socketPath);
} catch (err) {
console.error('UnixStream', err);
}
const server = net.createServer(onSocket);
stream.on('finish', () => {
server.close();
});
server.listen(this.socketPath);
}
}
function StreamInput(stream) {
return new UnixStream(stream, socket => stream.pipe(socket));
}
function StreamOutput(stream) {
return new UnixStream(stream, socket => socket.pipe(stream));
}
module.exports = { StreamOutput, StreamInput };

View File

@@ -0,0 +1,104 @@
'use strict';
const EventEmitter = require('events');
const { Buffer } = require('node:buffer');
/**
* An interface class for volume transformation.
* @extends {EventEmitter}
*/
class VolumeInterface extends EventEmitter {
constructor({ volume = 1 } = {}) {
super();
this.setVolume(volume);
}
/**
* Whether or not the volume of this stream is editable
* @type {boolean}
* @readonly
*/
get volumeEditable() {
return true;
}
/**
* The current volume of the stream
* @type {number}
* @readonly
*/
get volume() {
return this._volume;
}
/**
* The current volume of the stream in decibels
* @type {number}
* @readonly
*/
get volumeDecibels() {
return Math.log10(this.volume) * 20;
}
/**
* The current volume of the stream from a logarithmic scale
* @type {number}
* @readonly
*/
get volumeLogarithmic() {
return Math.pow(this.volume, 1 / 1.660964);
}
applyVolume(buffer, volume) {
volume = volume || this._volume;
if (volume === 1) return buffer;
const out = Buffer.alloc(buffer.length);
for (let i = 0; i < buffer.length; i += 2) {
if (i >= buffer.length - 1) break;
const uint = Math.min(32767, Math.max(-32767, Math.floor(volume * buffer.readInt16LE(i))));
out.writeInt16LE(uint, i);
}
return out;
}
/**
* Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double.
* @param {number} volume The volume that you want to set
*/
setVolume(volume) {
/**
* Emitted when the volume of this interface changes.
* @event VolumeInterface#volumeChange
* @param {number} oldVolume The old volume of this interface
* @param {number} newVolume The new volume of this interface
*/
this.emit('volumeChange', this._volume, volume);
this._volume = volume;
}
/**
* Sets the volume in decibels.
* @param {number} db The decibels
*/
setVolumeDecibels(db) {
this.setVolume(Math.pow(10, db / 20));
}
/**
* Sets the volume so that a perceived value of 0.5 is half the perceived volume etc.
* @param {number} value The value for the volume
*/
setVolumeLogarithmic(value) {
this.setVolume(Math.pow(value, 1.660964));
}
}
const props = ['volumeDecibels', 'volumeLogarithmic', 'setVolumeDecibels', 'setVolumeLogarithmic'];
exports.applyToClass = function applyToClass(structure) {
for (const prop of props) {
Object.defineProperty(structure.prototype, prop, Object.getOwnPropertyDescriptor(VolumeInterface.prototype, prop));
}
};