feat: refactor screen share controller to use Streamer for session management and simplify stream handling
This commit is contained in:
@@ -1,10 +1,6 @@
|
|||||||
import type { Readable } from "node:stream";
|
|
||||||
import {
|
import {
|
||||||
playStream as defaultPlayStream,
|
|
||||||
prepareStream as defaultPrepareStream,
|
|
||||||
Encoders,
|
|
||||||
Streamer,
|
Streamer,
|
||||||
Utils,
|
playPreparedStream,
|
||||||
} from "../streaming";
|
} from "../streaming";
|
||||||
import { AppError } from "../errors";
|
import { AppError } from "../errors";
|
||||||
import { createChildLogger } from "../logger";
|
import { createChildLogger } from "../logger";
|
||||||
@@ -21,30 +17,11 @@ export interface ScreenShareVoiceStatus {
|
|||||||
activeChannelId: string | null;
|
activeChannelId: string | null;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface PreparedScreenStream {
|
|
||||||
command: { kill?: (signal: NodeJS.Signals) => unknown };
|
|
||||||
output: Readable;
|
|
||||||
}
|
|
||||||
|
|
||||||
type PrepareScreenStream = (
|
|
||||||
source: string,
|
|
||||||
options: object,
|
|
||||||
) => PreparedScreenStream;
|
|
||||||
|
|
||||||
type PlayScreenStream = (
|
|
||||||
output: Readable,
|
|
||||||
streamer: Streamer,
|
|
||||||
options: { type: "go-live" },
|
|
||||||
) => Promise<void>;
|
|
||||||
|
|
||||||
export interface ScreenShareControllerDependencies {
|
export interface ScreenShareControllerDependencies {
|
||||||
getVoiceStatus: () => ScreenShareVoiceStatus;
|
getVoiceStatus: () => ScreenShareVoiceStatus;
|
||||||
getPlayerOwner?: () => DiscordPlayerOwner;
|
getPlayerOwner?: () => DiscordPlayerOwner;
|
||||||
getDirectVideoUrl?: (source: string) => Promise<string>;
|
getDirectVideoUrl?: (source: string) => Promise<string>;
|
||||||
prepareStream?: PrepareScreenStream;
|
|
||||||
playStream?: PlayScreenStream;
|
|
||||||
streamer: Streamer;
|
streamer: Streamer;
|
||||||
joinVoice?: (guildId: string, channelId: string) => Promise<unknown>;
|
|
||||||
onStreamStart?: () => void;
|
onStreamStart?: () => void;
|
||||||
onStreamEnd?: () => void;
|
onStreamEnd?: () => void;
|
||||||
}
|
}
|
||||||
@@ -59,10 +36,6 @@ export function createScreenShareController(
|
|||||||
const getDirectVideoUrl =
|
const getDirectVideoUrl =
|
||||||
dependencies.getDirectVideoUrl ??
|
dependencies.getDirectVideoUrl ??
|
||||||
((source) => ytdlp.getDirectVideoUrl(source));
|
((source) => ytdlp.getDirectVideoUrl(source));
|
||||||
const prepareStream =
|
|
||||||
dependencies.prepareStream ?? (defaultPrepareStream as PrepareScreenStream);
|
|
||||||
const playStream =
|
|
||||||
dependencies.playStream ?? (defaultPlayStream as PlayScreenStream);
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
isActive(): boolean {
|
isActive(): boolean {
|
||||||
@@ -76,7 +49,7 @@ export function createScreenShareController(
|
|||||||
active.stop();
|
active.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure bot is in the voice channel via Streamer for video streaming
|
// Ensure bot is in the voice channel and owns the screen-share stream
|
||||||
if (
|
if (
|
||||||
!status.connected ||
|
!status.connected ||
|
||||||
!status.activeGuildId ||
|
!status.activeGuildId ||
|
||||||
@@ -96,52 +69,32 @@ export function createScreenShareController(
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Join voice via Streamer if not already connected for streaming
|
|
||||||
if (dependencies.joinVoice) {
|
|
||||||
logger.info("Joining voice channel for screen share via Streamer");
|
|
||||||
await dependencies.joinVoice(
|
|
||||||
status.activeGuildId,
|
|
||||||
status.activeChannelId,
|
|
||||||
);
|
|
||||||
logger.info("Voice channel joined via Streamer for screen share");
|
|
||||||
}
|
|
||||||
|
|
||||||
const directUrl = await getDirectVideoUrl(source);
|
const directUrl = await getDirectVideoUrl(source);
|
||||||
const { command, output } = prepareStream(directUrl, {
|
const session = await dependencies.streamer.createSession(
|
||||||
encoder: Encoders.software({ x264: { preset: "superfast" } }),
|
status.activeGuildId,
|
||||||
height: 720,
|
status.activeChannelId,
|
||||||
frameRate: 30,
|
);
|
||||||
bitrateVideo: 2500,
|
|
||||||
bitrateVideoMax: 4000,
|
|
||||||
includeAudio: true,
|
|
||||||
videoCodec: Utils.normalizeVideoCodec("H264"),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Add FFmpeg error logging
|
|
||||||
if (command && "stderr" in command && (command as any).stderr) {
|
|
||||||
(command as any).stderr.on("data", (data: Buffer) => {
|
|
||||||
if (data.toString().includes("Error")) {
|
|
||||||
logger.error({ error: data.toString() }, "FFmpeg Screen Error");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
dependencies.onStreamStart?.();
|
dependencies.onStreamStart?.();
|
||||||
|
|
||||||
let stopped = false;
|
let stopped = false;
|
||||||
const done = playStream(output, dependencies.streamer, {
|
const done = playPreparedStream(directUrl, session, {
|
||||||
type: "go-live",
|
fps: 30,
|
||||||
|
bitrate: 2500,
|
||||||
|
includeAudio: true,
|
||||||
|
presetH26x: "superfast",
|
||||||
}).finally(() => {
|
}).finally(() => {
|
||||||
active = null;
|
active = null;
|
||||||
dependencies.onStreamEnd?.();
|
dependencies.onStreamEnd?.();
|
||||||
});
|
});
|
||||||
|
done.catch(() => undefined);
|
||||||
|
|
||||||
active = {
|
active = {
|
||||||
done,
|
done,
|
||||||
stop() {
|
stop() {
|
||||||
if (stopped) return;
|
if (stopped) return;
|
||||||
stopped = true;
|
stopped = true;
|
||||||
command.kill?.("SIGTERM");
|
session.stop();
|
||||||
active = null;
|
active = null;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,8 +1,43 @@
|
|||||||
import { spawn } from "node:child_process";
|
import { spawn } from "node:child_process";
|
||||||
|
import { EventEmitter } from "node:events";
|
||||||
import { PassThrough } from "node:stream";
|
import { PassThrough } from "node:stream";
|
||||||
import type { Readable } from "node:stream";
|
import type { Readable } from "node:stream";
|
||||||
import type { Client } from "discord.js-selfbot-v13";
|
import type { Client } from "discord.js-selfbot-v13";
|
||||||
|
|
||||||
|
type VoiceConnectionLike = {
|
||||||
|
channel: {
|
||||||
|
id: string;
|
||||||
|
};
|
||||||
|
createStreamConnection: () => Promise<StreamConnectionLike>;
|
||||||
|
disconnect?: () => void;
|
||||||
|
};
|
||||||
|
|
||||||
|
type StreamConnectionLike = {
|
||||||
|
playVideo: (resource: string | Readable, options?: Record<string, unknown>) => DispatcherLike;
|
||||||
|
playAudio: (resource: string | Readable, options?: Record<string, unknown>) => DispatcherLike;
|
||||||
|
disconnect?: () => void;
|
||||||
|
};
|
||||||
|
|
||||||
|
type DispatcherLike = EventEmitter & {
|
||||||
|
stop?: () => void;
|
||||||
|
pause?: () => void;
|
||||||
|
resume?: () => void;
|
||||||
|
};
|
||||||
|
|
||||||
|
export interface StreamPlayOptions {
|
||||||
|
fps?: number;
|
||||||
|
bitrate?: number | string;
|
||||||
|
includeAudio?: boolean;
|
||||||
|
presetH26x?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface StreamSession {
|
||||||
|
connection: VoiceConnectionLike;
|
||||||
|
stream: StreamConnectionLike;
|
||||||
|
play(source: string | Readable, options?: StreamPlayOptions): Promise<void>;
|
||||||
|
stop(): void;
|
||||||
|
}
|
||||||
|
|
||||||
export const Encoders = {
|
export const Encoders = {
|
||||||
software: (opts: any) => opts,
|
software: (opts: any) => opts,
|
||||||
};
|
};
|
||||||
@@ -17,11 +52,81 @@ export class Streamer {
|
|||||||
this.client = client;
|
this.client = client;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lightweight joinVoice placeholder. Real implementation may create a
|
async joinVoice(guildId: string, channelId: string): Promise<VoiceConnectionLike> {
|
||||||
// WebRTC connection using private discord.js-selfbot-v13 internals.
|
const channel = (this.client.channels.resolve(channelId) ?? this.client.channels.cache.get(channelId)) as any;
|
||||||
async joinVoice(_guildId: string, _channelId: string): Promise<unknown> {
|
if (!channel || channel.guild?.id !== guildId) {
|
||||||
// No-op for now; consumers may override with a richer implementation.
|
throw new Error("VOICE_CHANNEL_NOT_FOUND");
|
||||||
return Promise.resolve({});
|
}
|
||||||
|
|
||||||
|
const voiceConnection = (await this.client.voice.joinChannel(channel as any, {
|
||||||
|
selfMute: true,
|
||||||
|
selfDeaf: true,
|
||||||
|
selfVideo: false,
|
||||||
|
videoCodec: "H264",
|
||||||
|
})) as unknown as VoiceConnectionLike;
|
||||||
|
|
||||||
|
return voiceConnection;
|
||||||
|
}
|
||||||
|
|
||||||
|
async createSession(guildId: string, channelId: string): Promise<StreamSession> {
|
||||||
|
const connection = await this.joinVoice(guildId, channelId);
|
||||||
|
const stream = await connection.createStreamConnection();
|
||||||
|
|
||||||
|
let activeVideo: DispatcherLike | null = null;
|
||||||
|
let activeAudio: DispatcherLike | null = null;
|
||||||
|
let finished = false;
|
||||||
|
|
||||||
|
const stop = () => {
|
||||||
|
activeVideo?.stop?.();
|
||||||
|
activeAudio?.stop?.();
|
||||||
|
stream.disconnect?.();
|
||||||
|
connection.disconnect?.();
|
||||||
|
};
|
||||||
|
|
||||||
|
const waitForFinish = () =>
|
||||||
|
new Promise<void>((resolve, reject) => {
|
||||||
|
const maybeResolve = () => {
|
||||||
|
if (finished) return;
|
||||||
|
finished = true;
|
||||||
|
resolve();
|
||||||
|
};
|
||||||
|
|
||||||
|
const handleError = (error: unknown) => {
|
||||||
|
if (finished) return;
|
||||||
|
finished = true;
|
||||||
|
stop();
|
||||||
|
reject(error instanceof Error ? error : new Error(String(error)));
|
||||||
|
};
|
||||||
|
|
||||||
|
activeVideo?.on("finish", maybeResolve);
|
||||||
|
activeAudio?.on("finish", maybeResolve);
|
||||||
|
activeVideo?.on("error", handleError);
|
||||||
|
activeAudio?.on("error", handleError);
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
connection,
|
||||||
|
stream,
|
||||||
|
async play(source: string | Readable, options: StreamPlayOptions = {}) {
|
||||||
|
const videoOptions = {
|
||||||
|
fps: options.fps ?? 30,
|
||||||
|
bitrate: options.bitrate ?? 2500,
|
||||||
|
presetH26x: options.presetH26x ?? "superfast",
|
||||||
|
};
|
||||||
|
|
||||||
|
activeVideo = stream.playVideo(source, videoOptions);
|
||||||
|
if (options.includeAudio !== false) {
|
||||||
|
activeAudio = stream.playAudio(source, { volume: false });
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await waitForFinish();
|
||||||
|
} finally {
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
stop,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -78,3 +183,19 @@ export async function playStream(
|
|||||||
if (output.readable) output.resume();
|
if (output.readable) output.resume();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function createStreamSession(
|
||||||
|
client: Client,
|
||||||
|
guildId: string,
|
||||||
|
channelId: string,
|
||||||
|
): Promise<StreamSession> {
|
||||||
|
return new Streamer(client).createSession(guildId, channelId);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function playPreparedStream(
|
||||||
|
source: string | Readable,
|
||||||
|
session: StreamSession,
|
||||||
|
options: StreamPlayOptions = {},
|
||||||
|
): Promise<void> {
|
||||||
|
await session.play(source, options);
|
||||||
|
}
|
||||||
|
|||||||
@@ -211,8 +211,6 @@ export async function startWebserver(
|
|||||||
const screenController = createScreenShareController({
|
const screenController = createScreenShareController({
|
||||||
getVoiceStatus: () => voiceController.getStatus(),
|
getVoiceStatus: () => voiceController.getStatus(),
|
||||||
streamer,
|
streamer,
|
||||||
joinVoice: (guildId: string, channelId: string) =>
|
|
||||||
streamer.joinVoice(guildId, channelId),
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const mediaController = new MediaController({
|
const mediaController = new MediaController({
|
||||||
|
|||||||
@@ -1,11 +1,13 @@
|
|||||||
import { PassThrough } from "node:stream";
|
|
||||||
import { describe, expect, it, vi } from "vitest";
|
import { describe, expect, it, vi } from "vitest";
|
||||||
import { AppError } from "../../src/errors";
|
import { AppError } from "../../src/errors";
|
||||||
import type { DiscordPlayerOwner } from "../../src/media/mediaTypes";
|
import type { DiscordPlayerOwner } from "../../src/media/mediaTypes";
|
||||||
import { createScreenShareController } from "../../src/media/screenShareController";
|
import { createScreenShareController } from "../../src/media/screenShareController";
|
||||||
|
|
||||||
function createDependencies() {
|
function createDependencies() {
|
||||||
const output = new PassThrough();
|
const session = {
|
||||||
|
play: vi.fn(() => new Promise<void>(() => {})),
|
||||||
|
stop: vi.fn(),
|
||||||
|
};
|
||||||
return {
|
return {
|
||||||
getVoiceStatus: vi.fn(() => ({
|
getVoiceStatus: vi.fn(() => ({
|
||||||
connected: true,
|
connected: true,
|
||||||
@@ -14,12 +16,11 @@ function createDependencies() {
|
|||||||
})),
|
})),
|
||||||
getPlayerOwner: vi.fn((): DiscordPlayerOwner => "none"),
|
getPlayerOwner: vi.fn((): DiscordPlayerOwner => "none"),
|
||||||
getDirectVideoUrl: vi.fn(async () => "https://cdn.example.com/video.mp4"),
|
getDirectVideoUrl: vi.fn(async () => "https://cdn.example.com/video.mp4"),
|
||||||
prepareStream: vi.fn(() => ({
|
streamer: {
|
||||||
command: { kill: vi.fn() },
|
createSession: vi.fn(async () => session),
|
||||||
output,
|
client: {},
|
||||||
})),
|
},
|
||||||
playStream: vi.fn(() => new Promise<void>(() => {})),
|
session,
|
||||||
streamer: { id: "streamer" },
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,14 +34,17 @@ describe("createScreenShareController", () => {
|
|||||||
expect(dependencies.getDirectVideoUrl).toHaveBeenCalledWith(
|
expect(dependencies.getDirectVideoUrl).toHaveBeenCalledWith(
|
||||||
"https://youtu.be/video",
|
"https://youtu.be/video",
|
||||||
);
|
);
|
||||||
expect(dependencies.prepareStream).toHaveBeenCalledWith(
|
expect(dependencies.streamer.createSession).toHaveBeenCalledWith(
|
||||||
"https://cdn.example.com/video.mp4",
|
"guild-1",
|
||||||
expect.objectContaining({ includeAudio: true }),
|
"channel-1",
|
||||||
);
|
);
|
||||||
expect(dependencies.playStream).toHaveBeenCalledWith(
|
expect(dependencies.session.play).toHaveBeenCalledWith(
|
||||||
dependencies.prepareStream.mock.results[0].value.output,
|
"https://cdn.example.com/video.mp4",
|
||||||
dependencies.streamer,
|
expect.objectContaining({
|
||||||
{ type: "go-live" },
|
includeAudio: true,
|
||||||
|
fps: 30,
|
||||||
|
bitrate: 2500,
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
expect(controller.isActive()).toBe(true);
|
expect(controller.isActive()).toBe(true);
|
||||||
playback.stop();
|
playback.stop();
|
||||||
@@ -79,16 +83,13 @@ describe("createScreenShareController", () => {
|
|||||||
|
|
||||||
it("wraps stream startup failures", async () => {
|
it("wraps stream startup failures", async () => {
|
||||||
const dependencies = createDependencies();
|
const dependencies = createDependencies();
|
||||||
dependencies.playStream.mockImplementation(() => {
|
dependencies.session.play.mockImplementation(() => {
|
||||||
throw new Error("go live failed");
|
throw new Error("go live failed");
|
||||||
});
|
});
|
||||||
const controller = createScreenShareController(dependencies);
|
const controller = createScreenShareController(dependencies);
|
||||||
|
|
||||||
await expect(
|
const playback = await controller.start("https://youtu.be/video");
|
||||||
controller.start("https://youtu.be/video"),
|
|
||||||
).rejects.toMatchObject({
|
await expect(playback.done).rejects.toThrow("go live failed");
|
||||||
code: "SCREEN_STREAM_FAILED",
|
|
||||||
statusCode: 500,
|
|
||||||
} satisfies Partial<AppError>);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user