feat(VoiceConnection): New class StreamConnectionReadonly

This commit is contained in:
Elysia
2024-10-27 02:30:45 +07:00
parent f50aeefc10
commit c6a905cb80
7 changed files with 392 additions and 26 deletions

View File

@@ -2,6 +2,7 @@
const EventEmitter = require('events');
const { setTimeout } = require('node:timers');
const { Collection } = require('@discordjs/collection');
const VoiceUDP = require('./networking/VoiceUDPClient');
const VoiceWebSocket = require('./networking/VoiceWebSocket');
const MediaPlayer = require('./player/MediaPlayer');
@@ -134,9 +135,10 @@ class VoiceConnection extends EventEmitter {
/**
* Video codec
* * `VP8`
* * `VP9` (Not supported)
* * `VP9` (Not supported for encoding)
* * `H264`
* * `H265` (Not supported)
* * `H265` (Not supported for encoding, worked for decoding)
* * `AV1` (Not supported for encoding)
* @typedef {string} VideoCodec
*/
@@ -151,6 +153,12 @@ class VoiceConnection extends EventEmitter {
* @type {?StreamConnection}
*/
this.streamConnection = null;
/**
* All stream watch connection
* @type {Collection<Snowflake, StreamConnectionReadonly>}
*/
this.streamWatchConnection = new Collection();
}
/**
@@ -653,18 +661,17 @@ class VoiceConnection extends EventEmitter {
if (!this.eventHook) {
this.eventHook = true; // Dont listen this event two times :/
this.channel.client.on('raw', packet => {
if (
typeof packet !== 'object' ||
!packet.t ||
!packet.d ||
!this.streamConnection ||
!packet.d?.stream_key
) {
if (typeof packet !== 'object' || !packet.t || !packet.d || !packet.d?.stream_key) {
return;
}
const { t: event, d: data } = packet;
const StreamKey = parseStreamKey(data.stream_key);
if (StreamKey.userId === this.channel.client.user.id && this.channel.id == StreamKey.channelId) {
if (
StreamKey.userId === this.channel.client.user.id &&
this.channel.id == StreamKey.channelId &&
this.streamConnection
) {
// Current user stream
switch (event) {
case 'STREAM_CREATE': {
this.streamConnection.setSessionId(this.authentication.sessionId);
@@ -681,6 +688,25 @@ class VoiceConnection extends EventEmitter {
}
}
}
if (this.streamWatchConnection.has(StreamKey.userId) && this.channel.id == StreamKey.channelId) {
const streamConnection = this.streamWatchConnection.get(StreamKey.userId);
// Watch user stream
switch (event) {
case 'STREAM_CREATE': {
streamConnection.setSessionId(this.authentication.sessionId);
streamConnection.serverId = data.rtc_server_id;
break;
}
case 'STREAM_SERVER_UPDATE': {
streamConnection.setTokenAndEndpoint(data.token, data.endpoint);
break;
}
case 'STREAM_DELETE': {
streamConnection.disconnect();
break;
}
}
}
});
}
@@ -712,6 +738,112 @@ class VoiceConnection extends EventEmitter {
}
});
}
/**
* Watch user stream
* @param {UserResolvable} user Discord user
* @returns {Promise<StreamConnectionReadonly>}
*/
joinStreamConnection(user) {
const userId = this.client.users.resolveId(user);
// Check if user is streaming
if (!userId) {
return Promise.reject(new Error('VOICE_USER_MISSING'));
}
const voiceState = this.channel.guild?.voiceStates.cache.get(userId) || this.client.voiceStates.cache.get(userId);
if (!voiceState || !voiceState.streaming) {
return Promise.reject(new Error('VOICE_USER_NOT_STREAMING'));
}
// eslint-disable-next-line consistent-return
return new Promise((resolve, reject) => {
if (this.streamWatchConnection.has(userId)) {
return resolve(this.streamWatchConnection.get(userId));
} else {
const connection = new StreamConnectionReadonly(this.voiceManager, this.channel, this, userId);
this.streamWatchConnection.set(userId, connection);
connection.setVideoCodec(this.videoCodec);
// Setup event...
if (!this.eventHook) {
this.eventHook = true; // Dont listen this event two times :/
this.channel.client.on('raw', packet => {
if (typeof packet !== 'object' || !packet.t || !packet.d || !packet.d?.stream_key) {
return;
}
const { t: event, d: data } = packet;
const StreamKey = parseStreamKey(data.stream_key);
if (
StreamKey.userId === this.channel.client.user.id &&
this.channel.id == StreamKey.channelId &&
this.streamConnection
) {
// Current user stream
switch (event) {
case 'STREAM_CREATE': {
this.streamConnection.setSessionId(this.authentication.sessionId);
this.streamConnection.serverId = data.rtc_server_id;
break;
}
case 'STREAM_SERVER_UPDATE': {
this.streamConnection.setTokenAndEndpoint(data.token, data.endpoint);
break;
}
case 'STREAM_DELETE': {
this.streamConnection.disconnect();
break;
}
}
}
if (this.streamWatchConnection.has(StreamKey.userId) && this.channel.id == StreamKey.channelId) {
const streamConnection = this.streamWatchConnection.get(StreamKey.userId);
// Watch user stream
switch (event) {
case 'STREAM_CREATE': {
streamConnection.setSessionId(this.authentication.sessionId);
streamConnection.serverId = data.rtc_server_id;
break;
}
case 'STREAM_SERVER_UPDATE': {
streamConnection.setTokenAndEndpoint(data.token, data.endpoint);
break;
}
case 'STREAM_DELETE': {
streamConnection.disconnect();
break;
}
}
}
});
}
connection.sendSignalScreenshare();
connection.on('debug', msg =>
this.channel.client.emit(
'debug',
`[VOICE STREAM WATCH (${userId}>${this.channel.guild?.id || this.channel.id}:${
connection.status
})]: ${msg}`,
),
);
connection.once('failed', reason => {
this.streamWatchConnection.delete(userId);
reject(reason);
});
connection.on('error', reject);
connection.once('authenticated', () => {
connection.once('ready', () => {
resolve(connection);
connection.removeListener('error', reject);
});
connection.once('disconnect', () => {
this.streamWatchConnection.delete(userId);
});
});
}
});
}
}
/**
@@ -764,6 +896,10 @@ class StreamConnection extends VoiceConnection {
return Promise.resolve(this);
}
joinStreamConnection() {
throw new Error('STREAM_CANNOT_JOIN');
}
get streamConnection() {
return this;
}
@@ -772,6 +908,14 @@ class StreamConnection extends VoiceConnection {
// Why ?
}
get streamWatchConnection() {
return new Collection();
}
set streamWatchConnection(value) {
// Why ?
}
disconnect() {
if (this.#requestDisconnect) return;
this.emit('closing');
@@ -843,6 +987,127 @@ class StreamConnection extends VoiceConnection {
}
}
/**
* Represents a connection to a guild's voice server.
* ```js
* // Obtained using:
* client.voice.joinChannel(channel)
* .then(connection => connection.createStreamConnection())
* .then(connection => {
*
* });
* ```
* @extends {VoiceConnection}
*/
class StreamConnectionReadonly extends VoiceConnection {
#requestDisconnect = false;
/**
* @param {ClientVoiceManager} voiceManager Voice manager
* @param {Channel} channel any channel (joinable)
* @param {VoiceConnection} voiceConnection parent
* @param {Snowflake} userId User ID
*/
constructor(voiceManager, channel, voiceConnection, userId) {
super(voiceManager, channel);
/**
* Current voice connection
* @type {VoiceConnection}
*/
this.voiceConnection = voiceConnection;
/**
* User ID (who started the stream)
* @type {Snowflake}
*/
this.userId = userId;
Object.defineProperty(this, 'voiceConnection', {
value: voiceConnection,
writable: false,
});
/**
* Server Id
* @type {string | null}
*/
this.serverId = null;
}
createStreamConnection() {
throw new Error('STREAM_CONNECTION_READONLY');
}
joinStreamConnection() {
return Promise.resolve(this);
}
get streamConnection() {
return null;
}
set streamConnection(value) {
// Why ?
}
get streamWatchConnection() {
return new Collection();
}
set streamWatchConnection(value) {
// Why ?
}
disconnect() {
if (this.#requestDisconnect) return;
this.emit('closing');
this.emit('debug', 'Stream: disconnect() triggered');
clearTimeout(this.connectTimeout);
this.voiceConnection.streamWatchConnection.delete(this.userId);
this.sendStopScreenshare();
this._disconnect();
}
/**
* Create new stream connection (WS packet)
* @returns {void}
*/
sendSignalScreenshare() {
this.emit('debug', `Signal Stream Watch: ${this.streamKey}`);
return this.channel.client.ws.broadcast({
op: Opcodes.STREAM_WATCH,
d: {
stream_key: this.streamKey,
},
});
}
/**
* Stop screenshare, delete this connection (WS)
* @returns {void}
* @private Using StreamConnection#disconnect()
*/
sendStopScreenshare() {
this.#requestDisconnect = true;
this.channel.client.ws.broadcast({
op: Opcodes.STREAM_DELETE,
d: {
stream_key: this.streamKey,
},
});
}
/**
* Current stream key
* @type {string}
*/
get streamKey() {
return `${['DM', 'GROUP_DM'].includes(this.channel.type) ? 'call' : `guild:${this.channel.guild.id}`}:${
this.channel.id
}:${this.userId}`;
}
}
PlayInterface.applyToClass(VoiceConnection);
PlayInterface.applyToClass(StreamConnection);

View File

@@ -19,11 +19,12 @@ Please use the @dank074/discord-video-stream library for the best support.
const { Buffer } = require('buffer');
const VideoDispatcher = require('./VideoDispatcher');
const Util = require('../../../util/Util');
const { H264Helpers, H265Helpers } = require('../player/processing/AnnexBNalSplitter');
class AnnexBDispatcher extends VideoDispatcher {
constructor(player, highWaterMark = 12, streams, fps, nalFunctions) {
super(player, highWaterMark, streams, fps);
constructor(player, highWaterMark = 12, streams, fps, nalFunctions, payloadType) {
super(player, highWaterMark, streams, fps, payloadType);
this._nalFunctions = nalFunctions;
}
@@ -66,7 +67,7 @@ class AnnexBDispatcher extends VideoDispatcher {
class H264Dispatcher extends AnnexBDispatcher {
constructor(player, highWaterMark = 12, streams, fps) {
super(player, highWaterMark, streams, fps, H264Helpers);
super(player, highWaterMark, streams, fps, H264Helpers, Util.getPayloadType('H264'));
}
makeFragmentationUnitHeader(isFirstPacket, isLastPacket, naluHeader) {
@@ -91,7 +92,7 @@ class H264Dispatcher extends AnnexBDispatcher {
class H265Dispatcher extends AnnexBDispatcher {
constructor(player, highWaterMark = 12, streams, fps) {
super(player, highWaterMark, streams, fps, H265Helpers);
super(player, highWaterMark, streams, fps, H265Helpers, Util.getPayloadType('H265'));
}
makeFragmentationUnitHeader(isFirstPacket, isLastPacket, naluHeader) {

View File

@@ -19,10 +19,11 @@ Please use the @dank074/discord-video-stream library for the best support.
const { Buffer } = require('node:buffer');
const VideoDispatcher = require('./VideoDispatcher');
const Util = require('../../../util/Util');
class VP8Dispatcher extends VideoDispatcher {
constructor(player, highWaterMark = 12, streams, fps) {
super(player, highWaterMark, streams, fps);
super(player, highWaterMark, streams, fps, Util.getPayloadType('VP8'));
}
makeChunk(buffer, isFirstFrame) {

View File

@@ -14,8 +14,8 @@ const BaseDispatcher = require('./BaseDispatcher');
* @extends {BaseDispatcher}
*/
class VideoDispatcher extends BaseDispatcher {
constructor(player, highWaterMark = 12, streams, fps) {
super(player, highWaterMark, 101, true, streams);
constructor(player, highWaterMark = 12, streams, fps, payloadType) {
super(player, highWaterMark, payloadType, true, streams);
this.fps = fps;
}

View File

@@ -6,6 +6,7 @@ const { isIPv4 } = require('net');
const { Buffer } = require('node:buffer');
const { Error } = require('../../../errors');
const { VoiceOpcodes } = require('../../../util/Constants');
const Util = require('../../../util/Util');
/**
* Represents a UDP client for a Voice Connection.
@@ -132,15 +133,7 @@ class VoiceConnectionUDPClient extends EventEmitter {
priority: 1000,
payload_type: 120,
},
{
name: this.voiceConnection.videoCodec,
type: 'video',
priority: 1000,
payload_type: 101,
rtx_payload_type: 102,
encode: true,
decode: true,
},
...Util.getAllPayloadType(),
],
data: {
address: packet.address,

View File

@@ -204,6 +204,10 @@ const Messages = {
UDP_WRONG_HANDSHAKE: 'Wrong handshake packet for UDP',
INVALID_VIDEO_CODEC: codecs => `Only these codecs are supported: ${codecs.join(', ')}`,
STREAM_CONNECTION_READONLY: 'Cannot send data to a read-only stream',
STREAM_CANNOT_JOIN: 'Cannot join a stream to itself',
VOICE_USER_NOT_STREAMING: 'User is not streaming',
};
for (const [name, message] of Object.entries(Messages)) register(name, message);

View File

@@ -19,6 +19,60 @@ const TextSortableGroupTypes = ['GUILD_TEXT', 'GUILD_ANNOUCMENT', 'GUILD_FORUM']
const VoiceSortableGroupTypes = ['GUILD_VOICE', 'GUILD_STAGE_VOICE'];
const CategorySortableGroupTypes = ['GUILD_CATEGORY'];
const payloadTypes = [
{
name: 'opus',
type: 'audio',
priority: 1000,
payload_type: 120,
},
{
name: 'AV1',
type: 'video',
priority: 1000,
payload_type: 101,
rtx_payload_type: 102,
encode: false,
decode: false,
},
{
name: 'H265',
type: 'video',
priority: 2000,
payload_type: 103,
rtx_payload_type: 104,
encode: false,
decode: false,
},
{
name: 'H264',
type: 'video',
priority: 3000,
payload_type: 105,
rtx_payload_type: 106,
encode: true,
decode: true,
},
{
name: 'VP8',
type: 'video',
priority: 4000,
payload_type: 107,
rtx_payload_type: 108,
encode: true,
decode: false,
},
{
name: 'VP9',
type: 'video',
priority: 5000,
payload_type: 109,
rtx_payload_type: 110,
encode: true,
decode: false,
},
];
/**
* Contains various general-purpose utility methods.
*/
@@ -881,6 +935,54 @@ class Util extends null {
return Object.keys(data).length > 0 ? data : undefined;
}
static getAllPayloadType() {
return payloadTypes;
}
static getPayloadType(codecName) {
return payloadTypes.find(p => p.name === codecName).payload_type;
}
static getSDPCodecName(packet, portUdp) {
let payload, payloadType;
if (typeof packet === 'string') {
payload = payloadTypes.find(p => p.name === packet);
payloadType = payload.payload_type;
} else {
const payloadType = packet[1] > 120 ? packet[1] & 0x80 : packet[1];
console.log('payloadType', payloadType, packet, portUdp);
payload = payloadTypes.find(p => p.payload_type === payloadType);
}
let sdpData = `o=- 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
a=tool:libavformat 61.1.100
m=video ${portUdp} RTP/AVP ${payloadType}
a=rtpmap:${payloadType} ${payload.name}/90000
#Placeholder
a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level
a=extmap:2 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
a=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01
a=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid
a=extmap:5 http://www.webrtc.org/experiments/rtp-hdrext/playout-delay
a=extmap:6 http://www.webrtc.org/experiments/rtp-hdrext/video-content-type
a=extmap:7 http://www.webrtc.org/experiments/rtp-hdrext/video-timing
a=extmap:8 http://www.webrtc.org/experiments/rtp-hdrext/color-space
a=extmap:10 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id
a=extmap:11 urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id
a=extmap:13 urn:3gpp:video-orientation
a=extmap:14 urn:ietf:params:rtp-hdrext:toffset
`;
if (payload.name === 'H264') {
sdpData = sdpData.replace(
'#Placeholder',
`a=fmtp:${payloadType} profile-level-id=42e01f;sprop-parameter-sets=Z0IAH6tAoAt2AtwEBAaQeJEV,aM4JyA==;packetization-mode=1`,
);
}
return sdpData;
}
}
module.exports = Util;