feat: Video receiver (2/n)

- Only VP8 supported
- With example JSDoc
- Tested but bad quality
This commit is contained in:
Elysia
2024-07-25 21:09:52 +07:00
parent 24e151805d
commit 74f49ba92d
6 changed files with 197 additions and 23 deletions

View File

@@ -586,7 +586,7 @@ class VoiceConnection extends EventEmitter {
rtx_ssrc: 27735,
rid: '100',
quality: 100,
max_resolution: [Object],
max_resolution: { width: 0, type: 'source', height: 0 },,
max_framerate: 60,
active: false
}

View File

@@ -144,7 +144,8 @@ class MediaPlayer extends EventEmitter {
// Get stream type
if (this.voiceConnection.videoCodec == 'VP8') {
args.push('-f', 'ivf', '-deadline', 'realtime', '-c:v', options?.copy ? 'copy' : 'libvpx', '-speed', '5');
args.push('-f', 'ivf', '-deadline', 'realtime', '-c:v', options?.copy ? 'copy' : 'libvpx');
// Remove '-speed', '5' bc bad quality
}
if (this.voiceConnection.videoCodec == 'H264') {

View File

@@ -3,6 +3,7 @@
const EventEmitter = require('events');
const { Buffer } = require('node:buffer');
const { setTimeout } = require('node:timers');
const { IvfJoinner } = require('./video/IvfJoinner');
const Speaking = require('../../../util/Speaking');
const secretbox = require('../util/Secretbox');
const { SILENCE_FRAME } = require('../util/Silence');
@@ -21,6 +22,7 @@ class PacketHandler extends EventEmitter {
this.nonce = Buffer.alloc(24);
this.receiver = receiver;
this.streams = new Map();
this.videoStreams = new Map();
this.speakingTimeouts = new Map();
}
@@ -44,6 +46,14 @@ class PacketHandler extends EventEmitter {
return stream;
}
makeVideoStream(user) {
if (this.videoStreams.has(user)) return this.videoStreams.get(user);
const stream = new IvfJoinner('VP8'); // Test VP8 ok
stream.stream.on('end', () => this.videoStreams.delete(user));
this.videoStreams.set(user, stream);
return stream;
}
parseBuffer(buffer) {
const { secret_key, mode } = this.receiver.connection.authentication;
@@ -60,6 +70,7 @@ class PacketHandler extends EventEmitter {
}
// Open packet
if (!secret_key) return new Error('secret_key cannot be null or undefined');
let packet = secretbox.methods.open(buffer.slice(12, end), this.nonce, secret_key);
if (!packet) return new Error('Failed to decrypt voice packet');
packet = Buffer.from(packet);
@@ -70,26 +81,34 @@ class PacketHandler extends EventEmitter {
packet = packet.subarray(4 + 4 * headerExtensionLength);
}
// Ex VP8
// <Buffer 90 80 80 00 30 b7 01 9d 01 2a 80 07 38 04 0b c7 08 85 85 88 99 84 88 3f 82 00 06 16 04 f7 06 81 64 9f 6b db 9b 27 38 7b 27 38 7b 27 38 7b 27 38 7b 27 ... 1154 more bytes>
// 90 80: payloadDescriptorBuf (90 80 if first frame | 80 80 else)
// 80 00: pictureIdBuf
// n bytes: chunk raw (Ivf splitter)
return packet;
}
push(buffer) {
const ssrc = buffer.readUInt32BE(8);
const userStat = this.connection.ssrcMap.get(ssrc);
const userStat = this.connection.ssrcMap.get(ssrc) || this.connection.ssrcMap.get(ssrc - 1); // Maybe vidoe_ssrc ?
if (!userStat) return;
let opusPacket;
const streamInfo = this.streams.get(userStat.userId);
const videoStreamInfo = this.videoStreams.get(userStat.userId);
// If the user is in video, we need to check if the packet is just silence
if (userStat.hasVideo) {
opusPacket = this.parseBuffer(buffer);
if (opusPacket instanceof Error) {
// Only emit an error if we were actively receiving packets from this user
if (streamInfo) {
if (streamInfo || videoStreamInfo) {
this.emit('error', opusPacket);
return;
}
return;
}
if (SILENCE_FRAME.equals(opusPacket)) {
// If this is a silence frame, pretend we never received it
@@ -98,25 +117,28 @@ class PacketHandler extends EventEmitter {
}
let speakingTimeout = this.speakingTimeouts.get(ssrc);
if (typeof speakingTimeout === 'undefined') {
// Ensure at least the speaking bit is set.
// As the object is by reference, it's only needed once per client re-connect.
if (userStat.speaking === 0) {
userStat.speaking = Speaking.FLAGS.SPEAKING;
}
this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: userStat.speaking });
speakingTimeout = setTimeout(() => {
try {
this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: 0 });
clearTimeout(speakingTimeout);
this.speakingTimeouts.delete(ssrc);
} catch {
// Connection already closed, ignore
// Only for voice... idk
if (this.connection.ssrcMap.has(ssrc)) {
if (typeof speakingTimeout === 'undefined') {
// Ensure at least the speaking bit is set.
// As the object is by reference, it's only needed once per client re-connect.
if (userStat.speaking === 0) {
userStat.speaking = Speaking.FLAGS.SPEAKING;
}
}, DISCORD_SPEAKING_DELAY).unref();
this.speakingTimeouts.set(ssrc, speakingTimeout);
} else {
speakingTimeout.refresh();
this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: userStat.speaking });
speakingTimeout = setTimeout(() => {
try {
this.connection.onSpeaking({ user_id: userStat.userId, ssrc: ssrc, speaking: 0 });
clearTimeout(speakingTimeout);
this.speakingTimeouts.delete(ssrc);
} catch {
// Connection already closed, ignore
}
}, DISCORD_SPEAKING_DELAY).unref();
this.speakingTimeouts.set(ssrc, speakingTimeout);
} else {
speakingTimeout.refresh();
}
}
if (streamInfo) {
@@ -130,6 +152,18 @@ class PacketHandler extends EventEmitter {
}
stream.push(opusPacket);
}
if (videoStreamInfo) {
const stream = videoStreamInfo;
if (!opusPacket) {
opusPacket = this.parseBuffer(buffer);
if (opusPacket instanceof Error) {
this.emit('error', opusPacket);
return;
}
}
stream.push(opusPacket); // VP8 ? idk
}
}
}

View File

@@ -53,6 +53,30 @@ class VoiceReceiver extends EventEmitter {
}
return stream;
}
/**
* Creates a new video receiving stream. If a stream already exists for a user, then that stream will be returned
* rather than generating a new one.
* @param {UserResolvable} user The user to start listening to.
* @returns {IvfJoinner}
* @deprecated Only support VP8
* @example
* const video = connection.receiver.createVideoStream('1071734918972985395');
* video.stream.pipe(fs.createWriteStream('test.ivf'));
* setTimeout(() => {
* video.stop();
* video.createFinalFile(
* fs.createReadStream('test.ivf'),
* fs.createWriteStream('final.ivf'),
* );
* }, 10_000);
*/
createVideoStream(user) {
user = this.connection.client.users.resolve(user);
if (!user) throw new Error('VOICE_USER_MISSING');
const stream = this.packets.makeVideoStream(user.id);
return stream;
}
}
module.exports = VoiceReceiver;

View File

@@ -0,0 +1,106 @@
'use strict';
const { Buffer } = require('buffer');
const { setTimeout } = require('timers');
class Readable extends require('stream').Readable {
_read() {} // eslint-disable-line no-empty-function
}
/**
* Receives video packets from a voice connection.
*/
class IvfJoinner {
constructor(codec = 'VP8') {
this.codec = codec;
this.ivfHeader = this.getHeaderIvf();
this.count = 0;
/**
* Readable stream
* @type {Readable}
*/
this.stream = new Readable();
this._tempBuffer = null;
this._fps = 0;
this.timeConvert = null;
this.lastConvert = null;
this.firstFrame = Buffer.from([0x90, 0x80]);
this._timeoutFps = null;
}
getHeaderIvf() {
const ivfHeader = Buffer.alloc(32);
ivfHeader.write('DKIF'); // Signature
ivfHeader.writeUInt16LE(0, 4); // Version
ivfHeader.writeUInt16LE(32, 6); // Header length
ivfHeader.write(`${this.codec}0`, 8); // Codec FourCC
ivfHeader.writeUInt16LE(0, 12); // Width
ivfHeader.writeUInt16LE(0, 14); // Height
ivfHeader.writeUInt32LE(this._fps, 16); // Frame rate
ivfHeader.writeUInt32LE(1, 20); // Framerate denominator
ivfHeader.writeUInt32LE(this.count + 1, 24); // Frame count
return ivfHeader;
}
getFramedata() {
const frameHeader = Buffer.alloc(12);
frameHeader.writeUInt32LE(this._tempBuffer.length, 0); // Frame size
frameHeader.writeUInt32LE(this.count, 4); // Timestamp
return frameHeader;
}
push(bufferRaw) {
if (!this._timeoutFps) {
this._timeoutFps = setTimeout(() => {
if (this.stream.destroyed) return;
this._fps = Math.round((this.lastConvert - this.timeConvert) / this.count);
// ! Todo: need improved
this._timeoutFps = null;
}, 500).unref();
}
if (!this.timeConvert) {
this.timeConvert = performance.now();
}
// Ex VP8
// <Buffer 90 80 80 00 30 b7 01 9d 01 2a 80 07 38 04 0b c7 08 85 85 88 99 84 88 3f 82 00 06 16 04 f7 06 81 64 9f 6b db 9b 27 38 7b 27 38 7b 27 38 7b 27 38 7b 27 ... 1154 more bytes>
// 90 80: payloadDescriptorBuf (90 80 if first frame | 80 80 else)
// 80 00: pictureIdBuf
// n bytes: chunk raw (Ivf splitter)
const payloadDescriptorBuf = bufferRaw.slice(0, 2);
const data = bufferRaw.slice(4);
const isFirstFrame = Buffer.compare(payloadDescriptorBuf, this.firstFrame) === 0;
if (isFirstFrame && this._tempBuffer) {
this.count++;
this.lastConvert = performance.now();
this.stream.push(Buffer.concat([this.getFramedata(), this._tempBuffer]));
this._tempBuffer = null;
}
if (!this._tempBuffer) {
this._tempBuffer = data;
} else {
this._tempBuffer = Buffer.concat([this._tempBuffer, data]);
}
}
/**
* Force stop stream
* @returns {void}
*/
stop() {
this.stream.push(null);
this.stream.emit('end'); // Force close stream;
this.stream.destroy();
}
/**
* Convert partial file to full file
* @param {Readable} readable File created by stream (Raw)
* @param {Writable} writeable Output (Ivf)
* @returns {void}
*/
createFinalFile(readable, writeable) {
if (this.stream.destroyed) {
writeable.write(this.getHeaderIvf());
readable.pipe(writeable);
}
}
}
module.exports = {
IvfJoinner,
};