diff --git a/src/client/voice/VoiceConnection.js b/src/client/voice/VoiceConnection.js index 859c2d5..7a417b2 100644 --- a/src/client/voice/VoiceConnection.js +++ b/src/client/voice/VoiceConnection.js @@ -586,7 +586,7 @@ class VoiceConnection extends EventEmitter { rtx_ssrc: 27735, rid: '100', quality: 100, - max_resolution: [Object], + max_resolution: { width: 0, type: 'source', height: 0 },, max_framerate: 60, active: false } diff --git a/src/client/voice/player/MediaPlayer.js b/src/client/voice/player/MediaPlayer.js index 882607f..6dd4f80 100644 --- a/src/client/voice/player/MediaPlayer.js +++ b/src/client/voice/player/MediaPlayer.js @@ -144,7 +144,8 @@ class MediaPlayer extends EventEmitter { // Get stream type if (this.voiceConnection.videoCodec == 'VP8') { - args.push('-f', 'ivf', '-deadline', 'realtime', '-c:v', options?.copy ? 'copy' : 'libvpx', '-speed', '5'); + args.push('-f', 'ivf', '-deadline', 'realtime', '-c:v', options?.copy ? 'copy' : 'libvpx'); + // Remove '-speed', '5' bc bad quality } if (this.voiceConnection.videoCodec == 'H264') { diff --git a/src/client/voice/receiver/PacketHandler.js b/src/client/voice/receiver/PacketHandler.js index 00d9a44..0877f70 100644 --- a/src/client/voice/receiver/PacketHandler.js +++ b/src/client/voice/receiver/PacketHandler.js @@ -3,6 +3,7 @@ const EventEmitter = require('events'); const { Buffer } = require('node:buffer'); const { setTimeout } = require('node:timers'); +const { IvfJoinner } = require('./video/IvfJoinner'); const Speaking = require('../../../util/Speaking'); const secretbox = require('../util/Secretbox'); const { SILENCE_FRAME } = require('../util/Silence'); @@ -21,6 +22,7 @@ class PacketHandler extends EventEmitter { this.nonce = Buffer.alloc(24); this.receiver = receiver; this.streams = new Map(); + this.videoStreams = new Map(); this.speakingTimeouts = new Map(); } @@ -44,6 +46,14 @@ class PacketHandler extends EventEmitter { return stream; } + makeVideoStream(user) { + if (this.videoStreams.has(user)) return this.videoStreams.get(user); + const stream = new IvfJoinner('VP8'); // Test VP8 ok + stream.stream.on('end', () => this.videoStreams.delete(user)); + this.videoStreams.set(user, stream); + return stream; + } + parseBuffer(buffer) { const { secret_key, mode } = this.receiver.connection.authentication; @@ -60,6 +70,7 @@ class PacketHandler extends EventEmitter { } // Open packet + if (!secret_key) return new Error('secret_key cannot be null or undefined'); let packet = secretbox.methods.open(buffer.slice(12, end), this.nonce, secret_key); if (!packet) return new Error('Failed to decrypt voice packet'); packet = Buffer.from(packet); @@ -70,26 +81,34 @@ class PacketHandler extends EventEmitter { packet = packet.subarray(4 + 4 * headerExtensionLength); } + // Ex VP8 + // + // 90 80: payloadDescriptorBuf (90 80 if first frame | 80 80 else) + // 80 00: pictureIdBuf + // n bytes: chunk raw (Ivf splitter) + return packet; } push(buffer) { const ssrc = buffer.readUInt32BE(8); - const userStat = this.connection.ssrcMap.get(ssrc); + const userStat = this.connection.ssrcMap.get(ssrc) || this.connection.ssrcMap.get(ssrc - 1); // Maybe vidoe_ssrc ? if (!userStat) return; let opusPacket; const streamInfo = this.streams.get(userStat.userId); + const videoStreamInfo = this.videoStreams.get(userStat.userId); + // If the user is in video, we need to check if the packet is just silence if (userStat.hasVideo) { opusPacket = this.parseBuffer(buffer); if (opusPacket instanceof Error) { // Only emit an error if we were actively receiving packets from this user - if (streamInfo) { + if (streamInfo || videoStreamInfo) { this.emit('error', opusPacket); - return; } + return; } if (SILENCE_FRAME.equals(opusPacket)) { // If this is a silence frame, pretend we never received it @@ -98,25 +117,28 @@ class PacketHandler extends EventEmitter { } let speakingTimeout = this.speakingTimeouts.get(ssrc); - if (typeof speakingTimeout === 'undefined') { - // Ensure at least the speaking bit is set. - // As the object is by reference, it's only needed once per client re-connect. - if (userStat.speaking === 0) { - userStat.speaking = Speaking.FLAGS.SPEAKING; - } - this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: userStat.speaking }); - speakingTimeout = setTimeout(() => { - try { - this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: 0 }); - clearTimeout(speakingTimeout); - this.speakingTimeouts.delete(ssrc); - } catch { - // Connection already closed, ignore + // Only for voice... idk + if (this.connection.ssrcMap.has(ssrc)) { + if (typeof speakingTimeout === 'undefined') { + // Ensure at least the speaking bit is set. + // As the object is by reference, it's only needed once per client re-connect. + if (userStat.speaking === 0) { + userStat.speaking = Speaking.FLAGS.SPEAKING; } - }, DISCORD_SPEAKING_DELAY).unref(); - this.speakingTimeouts.set(ssrc, speakingTimeout); - } else { - speakingTimeout.refresh(); + this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: userStat.speaking }); + speakingTimeout = setTimeout(() => { + try { + this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: 0 }); + clearTimeout(speakingTimeout); + this.speakingTimeouts.delete(ssrc); + } catch { + // Connection already closed, ignore + } + }, DISCORD_SPEAKING_DELAY).unref(); + this.speakingTimeouts.set(ssrc, speakingTimeout); + } else { + speakingTimeout.refresh(); + } } if (streamInfo) { @@ -130,6 +152,18 @@ class PacketHandler extends EventEmitter { } stream.push(opusPacket); } + + if (videoStreamInfo) { + const stream = videoStreamInfo; + if (!opusPacket) { + opusPacket = this.parseBuffer(buffer); + if (opusPacket instanceof Error) { + this.emit('error', opusPacket); + return; + } + } + stream.push(opusPacket); // VP8 ? idk + } } } diff --git a/src/client/voice/receiver/Receiver.js b/src/client/voice/receiver/Receiver.js index 605d992..bd7e9c2 100644 --- a/src/client/voice/receiver/Receiver.js +++ b/src/client/voice/receiver/Receiver.js @@ -53,6 +53,30 @@ class VoiceReceiver extends EventEmitter { } return stream; } + + /** + * Creates a new video receiving stream. If a stream already exists for a user, then that stream will be returned + * rather than generating a new one. + * @param {UserResolvable} user The user to start listening to. + * @returns {IvfJoinner} + * @deprecated Only support VP8 + * @example + * const video = connection.receiver.createVideoStream('1071734918972985395'); + * video.stream.pipe(fs.createWriteStream('test.ivf')); + * setTimeout(() => { + * video.stop(); + * video.createFinalFile( + * fs.createReadStream('test.ivf'), + * fs.createWriteStream('final.ivf'), + * ); + * }, 10_000); + */ + createVideoStream(user) { + user = this.connection.client.users.resolve(user); + if (!user) throw new Error('VOICE_USER_MISSING'); + const stream = this.packets.makeVideoStream(user.id); + return stream; + } } module.exports = VoiceReceiver; diff --git a/src/client/voice/receiver/video/IvfJoinner.js b/src/client/voice/receiver/video/IvfJoinner.js new file mode 100644 index 00000000..bc37510 --- /dev/null +++ b/src/client/voice/receiver/video/IvfJoinner.js @@ -0,0 +1,106 @@ +'use strict'; + +const { Buffer } = require('buffer'); +const { setTimeout } = require('timers'); + +class Readable extends require('stream').Readable { + _read() {} // eslint-disable-line no-empty-function +} + +/** + * Receives video packets from a voice connection. + */ +class IvfJoinner { + constructor(codec = 'VP8') { + this.codec = codec; + this.ivfHeader = this.getHeaderIvf(); + this.count = 0; + /** + * Readable stream + * @type {Readable} + */ + this.stream = new Readable(); + this._tempBuffer = null; + this._fps = 0; + this.timeConvert = null; + this.lastConvert = null; + this.firstFrame = Buffer.from([0x90, 0x80]); + this._timeoutFps = null; + } + getHeaderIvf() { + const ivfHeader = Buffer.alloc(32); + ivfHeader.write('DKIF'); // Signature + ivfHeader.writeUInt16LE(0, 4); // Version + ivfHeader.writeUInt16LE(32, 6); // Header length + ivfHeader.write(`${this.codec}0`, 8); // Codec FourCC + ivfHeader.writeUInt16LE(0, 12); // Width + ivfHeader.writeUInt16LE(0, 14); // Height + ivfHeader.writeUInt32LE(this._fps, 16); // Frame rate + ivfHeader.writeUInt32LE(1, 20); // Framerate denominator + ivfHeader.writeUInt32LE(this.count + 1, 24); // Frame count + return ivfHeader; + } + getFramedata() { + const frameHeader = Buffer.alloc(12); + frameHeader.writeUInt32LE(this._tempBuffer.length, 0); // Frame size + frameHeader.writeUInt32LE(this.count, 4); // Timestamp + return frameHeader; + } + push(bufferRaw) { + if (!this._timeoutFps) { + this._timeoutFps = setTimeout(() => { + if (this.stream.destroyed) return; + this._fps = Math.round((this.lastConvert - this.timeConvert) / this.count); + // ! Todo: need improved + this._timeoutFps = null; + }, 500).unref(); + } + if (!this.timeConvert) { + this.timeConvert = performance.now(); + } + // Ex VP8 + // + // 90 80: payloadDescriptorBuf (90 80 if first frame | 80 80 else) + // 80 00: pictureIdBuf + // n bytes: chunk raw (Ivf splitter) + const payloadDescriptorBuf = bufferRaw.slice(0, 2); + const data = bufferRaw.slice(4); + const isFirstFrame = Buffer.compare(payloadDescriptorBuf, this.firstFrame) === 0; + if (isFirstFrame && this._tempBuffer) { + this.count++; + this.lastConvert = performance.now(); + this.stream.push(Buffer.concat([this.getFramedata(), this._tempBuffer])); + this._tempBuffer = null; + } + if (!this._tempBuffer) { + this._tempBuffer = data; + } else { + this._tempBuffer = Buffer.concat([this._tempBuffer, data]); + } + } + /** + * Force stop stream + * @returns {void} + */ + stop() { + this.stream.push(null); + this.stream.emit('end'); // Force close stream; + this.stream.destroy(); + } + /** + * Convert partial file to full file + * @param {Readable} readable File created by stream (Raw) + * @param {Writable} writeable Output (Ivf) + * @returns {void} + */ + createFinalFile(readable, writeable) { + if (this.stream.destroyed) { + writeable.write(this.getHeaderIvf()); + readable.pipe(writeable); + } + } +} + +module.exports = { + IvfJoinner, +}; diff --git a/typings/index.d.ts b/typings/index.d.ts index bfb181c..5cd1162 100644 --- a/typings/index.d.ts +++ b/typings/index.d.ts @@ -1076,6 +1076,8 @@ export class StreamConnection extends VoiceConnection { export class VoiceReceiver extends EventEmitter { constructor(connection: VoiceConnection); public createStream(user: UserResolvable, options?: { mode?: 'opus' | 'pcm'; end?: 'silence' | 'manual' }): Readable; + /** @deprecated Only recorded VP8 stream */ + public createVideoStream(user: UserResolvable): IvfJoinner; public on(event: 'debug', listener: (error: Error | string) => void): this; public on(event: string, listener: (...args: any[]) => void): this; @@ -1084,6 +1086,13 @@ export class VoiceReceiver extends EventEmitter { public once(event: string, listener: (...args: any[]) => void): this; } +export class IvfJoinner { + constructor(codec: 'VP8'); + public stream: Readable; + public stop(): void; + public createFinalFile(read: Readable, write: Writable): void; +} + export { Collection } from '@discordjs/collection'; export interface CollectorEventTypes {