fix: harden moderation broadcaster
This commit is contained in:
@@ -5,13 +5,25 @@ import type {
|
|||||||
MessageRecord,
|
MessageRecord,
|
||||||
ModerationWsEvent,
|
ModerationWsEvent,
|
||||||
} from "./types";
|
} from "./types";
|
||||||
|
import { createChildLogger } from "../logger";
|
||||||
|
|
||||||
type ClientLike = Pick<WebSocket, "readyState" | "send">;
|
type ClientLike = Pick<WebSocket, "readyState" | "send">;
|
||||||
|
|
||||||
|
const log = createChildLogger("broadcaster");
|
||||||
|
|
||||||
function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void {
|
function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void {
|
||||||
const payload = JSON.stringify({ ...event, timestamp: Date.now() });
|
const payload = JSON.stringify({ ...event, timestamp: Date.now() });
|
||||||
for (const client of clients) {
|
for (const client of clients) {
|
||||||
if (client.readyState === 1) client.send(payload);
|
if (client.readyState === 1) {
|
||||||
|
try {
|
||||||
|
client.send(payload);
|
||||||
|
} catch (error) {
|
||||||
|
log.warn(
|
||||||
|
{ error, eventType: event.type },
|
||||||
|
"Failed to send event to client"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -21,9 +33,11 @@ export function createBroadcaster() {
|
|||||||
return {
|
return {
|
||||||
addClient(client: ClientLike) {
|
addClient(client: ClientLike) {
|
||||||
clients.add(client);
|
clients.add(client);
|
||||||
|
log.debug({ clientCount: clients.size }, "Client added");
|
||||||
},
|
},
|
||||||
removeClient(client: ClientLike) {
|
removeClient(client: ClientLike) {
|
||||||
clients.delete(client);
|
clients.delete(client);
|
||||||
|
log.debug({ clientCount: clients.size }, "Client removed");
|
||||||
},
|
},
|
||||||
clientCount() {
|
clientCount() {
|
||||||
return clients.size;
|
return clients.size;
|
||||||
|
|||||||
@@ -29,4 +29,88 @@ describe("createBroadcaster", () => {
|
|||||||
|
|
||||||
expect(ws.send).not.toHaveBeenCalled();
|
expect(ws.send).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("broadcasts to multiple open clients", () => {
|
||||||
|
const ws1 = client();
|
||||||
|
const ws2 = client();
|
||||||
|
const ws3 = client();
|
||||||
|
const broadcaster = createBroadcaster();
|
||||||
|
|
||||||
|
broadcaster.addClient(ws1 as any);
|
||||||
|
broadcaster.addClient(ws2 as any);
|
||||||
|
broadcaster.addClient(ws3 as any);
|
||||||
|
|
||||||
|
broadcaster.messageCreated({
|
||||||
|
id: "m1",
|
||||||
|
content: "test",
|
||||||
|
} as any);
|
||||||
|
|
||||||
|
expect(ws1.send).toHaveBeenCalledTimes(1);
|
||||||
|
expect(ws2.send).toHaveBeenCalledTimes(1);
|
||||||
|
expect(ws3.send).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("failed send on one client does not prevent another client from receiving event", () => {
|
||||||
|
const ws1 = client();
|
||||||
|
const ws2 = client();
|
||||||
|
const ws3 = client();
|
||||||
|
const broadcaster = createBroadcaster();
|
||||||
|
|
||||||
|
// ws1 throws on send
|
||||||
|
ws1.send.mockImplementation(() => {
|
||||||
|
throw new Error("Send failed");
|
||||||
|
});
|
||||||
|
|
||||||
|
broadcaster.addClient(ws1 as any);
|
||||||
|
broadcaster.addClient(ws2 as any);
|
||||||
|
broadcaster.addClient(ws3 as any);
|
||||||
|
|
||||||
|
broadcaster.messageUpdated({
|
||||||
|
id: "m1",
|
||||||
|
content: "updated",
|
||||||
|
} as any);
|
||||||
|
|
||||||
|
// ws1 attempted send (threw)
|
||||||
|
expect(ws1.send).toHaveBeenCalledTimes(1);
|
||||||
|
// ws2 and ws3 should still receive the event
|
||||||
|
expect(ws2.send).toHaveBeenCalledTimes(1);
|
||||||
|
expect(ws3.send).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("clientCount tracks add/remove", () => {
|
||||||
|
const ws1 = client();
|
||||||
|
const ws2 = client();
|
||||||
|
const broadcaster = createBroadcaster();
|
||||||
|
|
||||||
|
expect(broadcaster.clientCount()).toBe(0);
|
||||||
|
|
||||||
|
broadcaster.addClient(ws1 as any);
|
||||||
|
expect(broadcaster.clientCount()).toBe(1);
|
||||||
|
|
||||||
|
broadcaster.addClient(ws2 as any);
|
||||||
|
expect(broadcaster.clientCount()).toBe(2);
|
||||||
|
|
||||||
|
broadcaster.removeClient(ws1 as any);
|
||||||
|
expect(broadcaster.clientCount()).toBe(1);
|
||||||
|
|
||||||
|
broadcaster.removeClient(ws2 as any);
|
||||||
|
expect(broadcaster.clientCount()).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("payload includes numeric timestamp", () => {
|
||||||
|
const ws = client();
|
||||||
|
const broadcaster = createBroadcaster();
|
||||||
|
|
||||||
|
broadcaster.addClient(ws as any);
|
||||||
|
broadcaster.attachmentCreated({
|
||||||
|
id: "a1",
|
||||||
|
message_id: "m1",
|
||||||
|
} as any);
|
||||||
|
|
||||||
|
expect(ws.send).toHaveBeenCalledTimes(1);
|
||||||
|
const payload = JSON.parse(ws.send.mock.calls[0][0]);
|
||||||
|
expect(payload.timestamp).toBeDefined();
|
||||||
|
expect(typeof payload.timestamp).toBe("number");
|
||||||
|
expect(payload.timestamp).toBeGreaterThan(0);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user