From e8d489bbfef26efef79296bd33b5feae793d166c Mon Sep 17 00:00:00 2001 From: nvms Date: Wed, 4 Sep 2024 06:12:49 -0400 Subject: [PATCH] formatting, add broadcastRoomExclude, make WSContext typable --- packages/keepalive-ws/src/server/index.ts | 148 ++++++++++++++++++---- 1 file changed, 123 insertions(+), 25 deletions(-) diff --git a/packages/keepalive-ws/src/server/index.ts b/packages/keepalive-ws/src/server/index.ts index 66c888d..c4a37c9 100644 --- a/packages/keepalive-ws/src/server/index.ts +++ b/packages/keepalive-ws/src/server/index.ts @@ -4,46 +4,105 @@ import { bufferToCommand } from "./command"; import { Connection } from "./connection"; export declare interface KeepAliveServer extends WebSocketServer { - on(event: "connection", handler: (socket: WebSocket, req: IncomingMessage) => void): this; + on( + event: "connection", + handler: (socket: WebSocket, req: IncomingMessage) => void, + ): this; on(event: "connected", handler: (c: Connection) => void): this; on(event: "close", handler: (c: Connection) => void): this; on(event: "error", cb: (this: WebSocketServer, error: Error) => void): this; - on(event: "headers", cb: (this: WebSocketServer, headers: string[], request: IncomingMessage) => void): this; - on(event: string | symbol, listener: (this: WebSocketServer, ...args: any[]) => void): this; + on( + event: "headers", + cb: ( + this: WebSocketServer, + headers: string[], + request: IncomingMessage, + ) => void, + ): this; + on( + event: string | symbol, + listener: (this: WebSocketServer, ...args: any[]) => void, + ): this; emit(event: "connection", socket: WebSocket, req: IncomingMessage): boolean; emit(event: "connected", connection: Connection): boolean; emit(event: "close", connection: Connection): boolean; emit(event: "error", connection: Connection): boolean; - once(event: "connection", cb: (this: WebSocketServer, socket: WebSocket, request: IncomingMessage) => void): this; + once( + event: "connection", + cb: ( + this: WebSocketServer, + socket: WebSocket, + request: IncomingMessage, + ) => void, + ): this; once(event: "error", cb: (this: WebSocketServer, error: Error) => void): this; - once(event: "headers", cb: (this: WebSocketServer, headers: string[], request: IncomingMessage) => void): this; + once( + event: "headers", + cb: ( + this: WebSocketServer, + headers: string[], + request: IncomingMessage, + ) => void, + ): this; once(event: "close" | "listening", cb: (this: WebSocketServer) => void): this; - once(event: string | symbol, listener: (this: WebSocketServer, ...args: any[]) => void): this; + once( + event: string | symbol, + listener: (this: WebSocketServer, ...args: any[]) => void, + ): this; - off(event: "connection", cb: (this: WebSocketServer, socket: WebSocket, request: IncomingMessage) => void): this; + off( + event: "connection", + cb: ( + this: WebSocketServer, + socket: WebSocket, + request: IncomingMessage, + ) => void, + ): this; off(event: "error", cb: (this: WebSocketServer, error: Error) => void): this; - off(event: "headers", cb: (this: WebSocketServer, headers: string[], request: IncomingMessage) => void): this; + off( + event: "headers", + cb: ( + this: WebSocketServer, + headers: string[], + request: IncomingMessage, + ) => void, + ): this; off(event: "close" | "listening", cb: (this: WebSocketServer) => void): this; - off(event: string | symbol, listener: (this: WebSocketServer, ...args: any[]) => void): this; + off( + event: string | symbol, + listener: (this: WebSocketServer, ...args: any[]) => void, + ): this; - addListener(event: "connection", cb: (client: WebSocket, request: IncomingMessage) => void): this; + addListener( + event: "connection", + cb: (client: WebSocket, request: IncomingMessage) => void, + ): this; addListener(event: "error", cb: (err: Error) => void): this; - addListener(event: "headers", cb: (headers: string[], request: IncomingMessage) => void): this; + addListener( + event: "headers", + cb: (headers: string[], request: IncomingMessage) => void, + ): this; addListener(event: "close" | "listening", cb: () => void): this; addListener(event: string | symbol, listener: (...args: any[]) => void): this; removeListener(event: "connection", cb: (client: WebSocket) => void): this; removeListener(event: "error", cb: (err: Error) => void): this; - removeListener(event: "headers", cb: (headers: string[], request: IncomingMessage) => void): this; + removeListener( + event: "headers", + cb: (headers: string[], request: IncomingMessage) => void, + ): this; removeListener(event: "close" | "listening", cb: () => void): this; - removeListener(event: string | symbol, listener: (...args: any[]) => void): this; + removeListener( + event: string | symbol, + listener: (...args: any[]) => void, + ): this; } -export class WSContext { +export class WSContext { wss: KeepAliveServer; connection: Connection; - payload: any; + payload: T; constructor(wss: KeepAliveServer, connection: Connection, payload: any) { this.wss = wss; @@ -52,8 +111,7 @@ export class WSContext { } } - -export type SocketMiddleware = (c: WSContext) => any | Promise; +export type SocketMiddleware = (c: WSContext) => any | Promise; export type KeepAliveServerOptions = ServerOptions & { /** @@ -72,7 +130,7 @@ export type KeepAliveServerOptions = ServerOptions & { export class KeepAliveServer extends WebSocketServer { connections: { [id: string]: Connection } = {}; remoteAddressToConnections: { [address: string]: Connection[] } = {}; - commands: { [command: string]: (context: WSContext) => Promise } = {}; + commands: { [command: string]: (context: WSContext) => Promise } = {}; globalMiddlewares: SocketMiddleware[] = []; middlewares: { [key: string]: SocketMiddleware[] } = {}; rooms: { [roomName: string]: Set } = {}; @@ -92,9 +150,10 @@ export class KeepAliveServer extends WebSocketServer { c.stopIntervals(); delete this.connections[c.id]; if (this.remoteAddressToConnections[c.remoteAddress]) { - this.remoteAddressToConnections[c.remoteAddress] = this.remoteAddressToConnections[c.remoteAddress].filter( - (cn) => cn.id !== c.id - ); + this.remoteAddressToConnections[c.remoteAddress] = + this.remoteAddressToConnections[c.remoteAddress].filter( + (cn) => cn.id !== c.id, + ); } if (!this.remoteAddressToConnections[c.remoteAddress].length) { @@ -111,8 +170,9 @@ export class KeepAliveServer extends WebSocketServer { this.remoteAddressToConnections[connection.remoteAddress] = []; } - this.remoteAddressToConnections[connection.remoteAddress].push(connection); - + this.remoteAddressToConnections[connection.remoteAddress].push( + connection, + ); this.emit("connected", connection); @@ -196,6 +256,35 @@ export class KeepAliveServer extends WebSocketServer { }); } + /** + * Given a roomName, command, payload, and Connection OR Connection[], broadcasts to all Connections + * that are in the room except the provided Connection(s). + */ + broadcastRoomExclude( + roomName: string, + command: string, + payload: any, + connection: Connection | Connection[], + ) { + const cmd = JSON.stringify({ command, payload }); + const room = this.rooms[roomName]; + + if (!room) return; + + const excludeIds = Array.isArray(connection) + ? connection.map((c) => c.id) + : [connection.id]; + + room.forEach((connectionId) => { + if (!excludeIds.includes(connectionId)) { + const conn = this.connections[connectionId]; + if (conn) { + conn.socket.send(cmd); + } + } + }); + } + /** * Given a connection, broadcasts a message to all connections except * the provided connection. @@ -241,7 +330,11 @@ export class KeepAliveServer extends WebSocketServer { this.rooms[roomName] = new Set(); } - registerCommand(command: string, callback: SocketMiddleware, middlewares: SocketMiddleware[] = []) { + registerCommand( + command: string, + callback: SocketMiddleware, + middlewares: SocketMiddleware[] = [], + ) { this.commands[command] = callback; this.prependMiddlewareToCommand(command, middlewares); } @@ -260,7 +353,12 @@ export class KeepAliveServer extends WebSocketServer { } } - private async runCommand(id: number, command: string, payload: any, connection: Connection) { + private async runCommand( + id: number, + command: string, + payload: any, + connection: Connection, + ) { const c = new WSContext(this, connection, payload); try {