feat(keepalive-ws): enhance README and improve client/server implementation

- Add tests
This commit is contained in:
nvms 2025-03-26 21:09:28 -04:00
parent 2acba51367
commit 7714d71b0a
16 changed files with 1007 additions and 588 deletions

View File

@ -1,99 +1,173 @@
For a TCP-based, node-only solution with a similar API, see [duplex](https://github.com/node-prism/duplex).
# keepalive-ws # keepalive-ws
A command server and client for simplified WebSocket communication, with builtin ping and latency messaging. [![NPM version](https://img.shields.io/npm/v/@prsm/keepalive-ws?color=a1b858&label=)](https://www.npmjs.com/package/@prsm/keepalive-ws)
Built for [grove](https://github.com/node-prism/grove), but works anywhere. A command server and client for simplified WebSocket communication, with built-in ping and latency messaging. Provides reliable, Promise-based communication with automatic reconnection and command queueing.
### Server For a TCP-based, node-only solution with a similar API, see [duplex](https://github.com/node-prism/duplex).
For node. ## Features
- **Promise-based API** - All operations return Promises for easy async/await usage
- **Command queueing** - Commands are automatically queued when offline
- **Reliable connections** - Robust error handling and reconnection
- **Bidirectional communication** - Full-duplex WebSocket communication
- **Latency monitoring** - Built-in ping/pong and latency measurement
- **Room-based messaging** - Group connections into rooms for targeted broadcasts
- **Lightweight** - Minimal dependencies
## Server
```typescript ```typescript
import { KeepAliveServer, WSContext } from "@prsm/keepalive-ws/server"; import { KeepAliveServer, WSContext } from "@prsm/keepalive-ws/server";
const ws = new KeepAliveServer({ // Create a server instance
// Where to mount this server and listen to messages. const server = new KeepAliveServer({
path: "/", port: 8080,
// How often to send ping messages to connected clients. pingInterval: 30000,
pingInterval: 30_000, latencyInterval: 5000,
// Calculate round-trip time and send latency updates
// to clients every 5s.
latencyInterval: 5_000,
}); });
ws.registerCommand( // Register command handlers
"authenticate", server.registerCommand("echo", async (context) => {
async (c: WSContext<{ token: string >}) => { return `Echo: ${context.payload}`;
const { token } = c.payload; });
// use c.payload to authenticate c.connection
return { ok: true, token };
},
);
ws.registerCommand( // Error handling
"throws", server.registerCommand("throws", async () => {
async (c: WSContext<unknown>) => { throw new Error("Something went wrong");
throw new Error("oops"); });
},
); // Room-based messaging
server.registerCommand("join-room", async (context) => {
const { roomName } = context.payload;
server.addToRoom(roomName, context.connection);
server.broadcastRoom(roomName, "user-joined", {
id: context.connection.id
});
return { success: true };
});
// Broadcasting to all clients
server.registerCommand("broadcast", async (context) => {
server.broadcast("announcement", context.payload);
return { sent: true };
});
``` ```
Extended API: ## Client
- Rooms
It can be useful to collect connections into rooms.
- `addToRoom(roomName: string, connection: Connection): void`
- `removeFromRoom(roomName: string, connection: Connection): void`
- `getRoom(roomName: string): Connection[]`
- `clearRoom(roomName: string): void`
- Command middleware
- Broadcasting to:
- all
- `broadcast(command: string, payload: any, connections?: Connection[]): void`
- all connections that share the same IP
- `broadcastRemoteAddress(c: Connection, command: string, payload: any): void`
- rooms
- `broadcastRoom(roomName: string, command: string, payload: any): void`
### Client
For the browser.
```typescript ```typescript
import { KeepAliveClient } from "@prsm/keepalive-ws/client"; import { KeepAliveClient } from "@prsm/keepalive-ws/client";
const opts = { // Create a client instance
// After 30s (+ maxLatency) of no ping, assume we've disconnected and attempt a const client = new KeepAliveClient("ws://localhost:8080", {
// reconnection if shouldReconnect is true. pingTimeout: 30000,
// This number should be coordinated with the pingInterval from KeepAliveServer. maxLatency: 2000,
pingTimeout: 30_000,
// Try to reconnect whenever we are disconnected.
shouldReconnect: true, shouldReconnect: true,
// This number, added to pingTimeout, is the maximum amount of time reconnectInterval: 2000,
// that can pass before the connection is considered closed.
// In this case, 32s.
maxLatency: 2_000,
// How often to try and connect during reconnection phase.
reconnectInterval: 2_000,
// How many times to try and reconnect before giving up.
maxReconnectAttempts: Infinity, maxReconnectAttempts: Infinity,
};
const ws = new KeepAliveClient("ws://localhost:8080", opts);
const { ok, token } = await ws.command("authenticate", {
username: "user",
password: "pass",
}); });
const result = await ws.command("throws", {}); // Connect to the server (returns a Promise)
// result is: { error: "oops" } await client.connect();
ws.on("latency", (e: CustomEvent<{ latency: number }>) => { // Using Promise-based API
// e.detail.latency is round-trip time in ms try {
const response = await client.command("echo", "Hello world", 5000);
console.log("Response:", response);
} catch (error) {
console.error("Error:", error);
}
// Join a room
await client.command("join-room", { roomName: "lobby" });
// Listen for events
client.on("user-joined", (event) => {
console.log("User joined:", event.detail.id);
}); });
// Monitor latency
client.on("latency", (event) => {
console.log("Current latency:", event.detail.latency, "ms");
});
// Graceful shutdown
await client.close();
```
## Extended Server API
### Room Management
```typescript
// Add a connection to a room
server.addToRoom("roomName", connection);
// Remove a connection from a room
server.removeFromRoom("roomName", connection);
// Get all connections in a room
const roomConnections = server.getRoom("roomName");
// Clear all connections from a room
server.clearRoom("roomName");
```
### Broadcasting
```typescript
// Broadcast to all connections
server.broadcast("eventName", payload);
// Broadcast to specific connections
server.broadcast("eventName", payload, connections);
// Broadcast to all connections except one
server.broadcastExclude(connection, "eventName", payload);
// Broadcast to all connections in a room
server.broadcastRoom("roomName", "eventName", payload);
// Broadcast to all connections in a room except one
server.broadcastRoomExclude("roomName", "eventName", payload, connection);
// Broadcast to all connections with the same IP
server.broadcastRemoteAddress(connection, "eventName", payload);
```
### Middleware
```typescript
// Global middleware for all commands
server.globalMiddlewares.push(async (context) => {
// Validate authentication, etc.
if (!isAuthenticated(context)) {
throw new Error("Unauthorized");
}
});
// Command-specific middleware
server.registerCommand(
"protected-command",
async (context) => {
return "Protected data";
},
[
async (context) => {
// Command-specific validation
if (!hasPermission(context)) {
throw new Error("Forbidden");
}
}
]
);
```
## Graceful Shutdown
```typescript
// Close client connection
await client.close();
// Close server
server.close();
``` ```

Binary file not shown.

View File

@ -33,6 +33,8 @@
"build:server": "tsup src/server/index.ts --format cjs,esm --dts --clean --minify --out-dir dist/server", "build:server": "tsup src/server/index.ts --format cjs,esm --dts --clean --minify --out-dir dist/server",
"build:client": "tsup src/client/index.ts --format cjs,esm --dts --clean --minify --out-dir dist/client", "build:client": "tsup src/client/index.ts --format cjs,esm --dts --clean --minify --out-dir dist/client",
"build": "npm run build:prep && npm run build:server && npm run build:client", "build": "npm run build:prep && npm run build:server && npm run build:client",
"test": "vitest run",
"test:watch": "vitest",
"release": "bumpp package.json && npm publish --access public" "release": "bumpp package.json && npm publish --access public"
}, },
"keywords": [], "keywords": [],
@ -44,6 +46,7 @@
"@types/ws": "^8.5.3", "@types/ws": "^8.5.3",
"bumpp": "^9.1.1", "bumpp": "^9.1.1",
"tsup": "^8.2.4", "tsup": "^8.2.4",
"typescript": "^5.5.4" "typescript": "^5.5.4",
"vitest": "^3.0.9"
} }
} }

View File

@ -1,6 +1,12 @@
import { EventEmitter } from "node:events";
import { WebSocket } from "ws";
import { CodeError } from "../common/codeerror";
import { Status } from "../common/status";
import { Connection } from "./connection"; import { Connection } from "./connection";
type KeepAliveClientOptions = Partial<{ export { Status } from "../common/status";
export type KeepAliveClientOptions = Partial<{
/** /**
* The number of milliseconds to wait before considering the connection closed due to inactivity. * The number of milliseconds to wait before considering the connection closed due to inactivity.
* When this happens, the connection will be closed and a reconnect will be attempted if @see KeepAliveClientOptions.shouldReconnect is true. * When this happens, the connection will be closed and a reconnect will be attempted if @see KeepAliveClientOptions.shouldReconnect is true.
@ -36,58 +42,129 @@ type KeepAliveClientOptions = Partial<{
maxReconnectAttempts: number; maxReconnectAttempts: number;
}>; }>;
const defaultOptions = (opts: KeepAliveClientOptions = {}) => { export class KeepAliveClient extends EventEmitter {
opts.pingTimeout = opts.pingTimeout ?? 30_000;
opts.maxLatency = opts.maxLatency ?? 2_000;
opts.shouldReconnect = opts.shouldReconnect ?? true;
opts.reconnectInterval = opts.reconnectInterval ?? 2_000;
opts.maxReconnectAttempts = opts.maxReconnectAttempts ?? Infinity;
return opts;
};
export class KeepAliveClient extends EventTarget {
connection: Connection; connection: Connection;
url: string; url: string;
socket: WebSocket; socket: WebSocket | null = null;
pingTimeout: ReturnType<typeof setTimeout>; pingTimeout: ReturnType<typeof setTimeout>;
options: KeepAliveClientOptions; options: Required<KeepAliveClientOptions>;
isReconnecting = false; isReconnecting = false;
private _status: Status = Status.OFFLINE;
constructor(url: string, opts: KeepAliveClientOptions = {}) { constructor(url: string, opts: KeepAliveClientOptions = {}) {
super(); super();
this.url = url; this.url = url;
this.socket = new WebSocket(url); this.connection = new Connection(null);
this.connection = new Connection(this.socket); this.options = {
this.options = defaultOptions(opts); pingTimeout: opts.pingTimeout ?? 30_000,
this.applyListeners(); maxLatency: opts.maxLatency ?? 2_000,
shouldReconnect: opts.shouldReconnect ?? true,
reconnectInterval: opts.reconnectInterval ?? 2_000,
maxReconnectAttempts: opts.maxReconnectAttempts ?? Infinity,
};
this.setupConnectionEvents();
} }
get on() { get status(): Status {
return this.connection.addEventListener.bind(this.connection); return this._status;
} }
applyListeners() { private setupConnectionEvents(): void {
this.connection.addEventListener("connection", () => { // Forward relevant events from connection to client
this.heartbeat(); this.connection.on("message", (data) => {
this.emit("message", data);
}); });
this.connection.addEventListener("close", () => { this.connection.on("close", () => {
this._status = Status.OFFLINE;
this.emit("close");
this.reconnect(); this.reconnect();
}); });
this.connection.addEventListener("ping", () => { this.connection.on("error", (error) => {
this.heartbeat(); this.emit("error", error);
}); });
this.connection.addEventListener( this.connection.on("ping", () => {
"message", this.heartbeat();
(ev: CustomEventInit<unknown>) => { this.emit("ping");
this.dispatchEvent(new CustomEvent("message", ev)); });
},
); this.connection.on("latency", (data) => {
this.emit("latency", data);
});
} }
heartbeat() { /**
* Connect to the WebSocket server.
* @returns A promise that resolves when the connection is established.
*/
connect(): Promise<void> {
if (this._status === Status.ONLINE) {
return Promise.resolve();
}
if (
this._status === Status.CONNECTING ||
this._status === Status.RECONNECTING
) {
return new Promise((resolve, reject) => {
const onConnect = () => {
this.removeListener("connect", onConnect);
this.removeListener("error", onError);
resolve();
};
const onError = (error: Error) => {
this.removeListener("connect", onConnect);
this.removeListener("error", onError);
reject(error);
};
this.once("connect", onConnect);
this.once("error", onError);
});
}
this._status = Status.CONNECTING;
return new Promise((resolve, reject) => {
try {
// Create a new WebSocket connection
this.socket = new WebSocket(this.url);
// Set up a direct onopen handler to ensure we catch the connection event
this.socket.onopen = () => {
this._status = Status.ONLINE;
this.connection.socket = this.socket;
this.connection.status = Status.ONLINE;
this.connection.applyListeners();
this.heartbeat();
this.emit("connect");
resolve();
};
// Set up a direct onerror handler for immediate connection errors
this.socket.onerror = (error) => {
this._status = Status.OFFLINE;
reject(
new CodeError(
"WebSocket connection error",
"ECONNECTION",
"ConnectionError",
),
);
};
} catch (error) {
this._status = Status.OFFLINE;
reject(error);
}
});
}
heartbeat(): void {
clearTimeout(this.pingTimeout); clearTimeout(this.pingTimeout);
this.pingTimeout = setTimeout(() => { this.pingTimeout = setTimeout(() => {
@ -100,23 +177,45 @@ export class KeepAliveClient extends EventTarget {
/** /**
* Disconnect the client from the server. * Disconnect the client from the server.
* The client will not attempt to reconnect. * The client will not attempt to reconnect.
* To reconnect, create a new KeepAliveClient. * @returns A promise that resolves when the connection is closed.
*/ */
disconnect() { close(): Promise<void> {
this.options.shouldReconnect = false; this.options.shouldReconnect = false;
if (this._status === Status.OFFLINE) {
return Promise.resolve();
}
return new Promise((resolve) => {
const onClose = () => {
this.removeListener("close", onClose);
this._status = Status.OFFLINE;
resolve();
};
this.once("close", onClose);
clearTimeout(this.pingTimeout);
if (this.socket) { if (this.socket) {
this.socket.close(); this.socket.close();
} }
});
clearTimeout(this.pingTimeout);
} }
private async reconnect() { /**
* @deprecated Use close() instead
*/
disconnect(): Promise<void> {
return this.close();
}
private reconnect(): void {
if (!this.options.shouldReconnect || this.isReconnecting) { if (!this.options.shouldReconnect || this.isReconnecting) {
return; return;
} }
this._status = Status.RECONNECTING;
this.isReconnecting = true; this.isReconnecting = true;
let attempt = 1; let attempt = 1;
@ -124,11 +223,14 @@ export class KeepAliveClient extends EventTarget {
if (this.socket) { if (this.socket) {
try { try {
this.socket.close(); this.socket.close();
} catch (e) {} } catch (e) {
// Ignore errors during close
}
} }
const connect = () => { const connect = () => {
this.socket = new WebSocket(this.url); this.socket = new WebSocket(this.url);
this.socket.onerror = () => { this.socket.onerror = () => {
attempt++; attempt++;
@ -136,37 +238,56 @@ export class KeepAliveClient extends EventTarget {
setTimeout(connect, this.options.reconnectInterval); setTimeout(connect, this.options.reconnectInterval);
} else { } else {
this.isReconnecting = false; this.isReconnecting = false;
this._status = Status.OFFLINE;
this.connection.dispatchEvent(new Event("reconnectfailed")); this.emit("reconnectfailed");
this.connection.dispatchEvent(new Event("reconnectionfailed"));
} }
}; };
this.socket.onopen = () => { this.socket.onopen = () => {
this.isReconnecting = false; this.isReconnecting = false;
this._status = Status.ONLINE;
this.connection.socket = this.socket; this.connection.socket = this.socket;
this.connection.status = Status.ONLINE;
this.connection.applyListeners(true); this.connection.applyListeners(true);
this.heartbeat();
this.connection.dispatchEvent(new Event("connection")); this.emit("connect");
this.connection.dispatchEvent(new Event("connected")); this.emit("reconnect");
this.connection.dispatchEvent(new Event("connect"));
this.connection.dispatchEvent(new Event("reconnection"));
this.connection.dispatchEvent(new Event("reconnected"));
this.connection.dispatchEvent(new Event("reconnect"));
}; };
}; };
connect(); connect();
} }
async command( /**
* Send a command to the server and wait for a response.
* @param command The command name to send
* @param payload The payload to send with the command
* @param expiresIn Timeout in milliseconds
* @param callback Optional callback function
* @returns A promise that resolves with the command result
*/
command(
command: string, command: string,
payload?: any, payload?: any,
expiresIn?: number, expiresIn: number = 30000,
callback?: Function, callback?: (result: any, error?: Error) => void,
) { ): Promise<any> {
// Ensure we're connected before sending commands
if (this._status !== Status.ONLINE) {
return this.connect()
.then(() =>
this.connection.command(command, payload, expiresIn, callback),
)
.catch((error) => {
if (callback) {
callback(null, error);
return Promise.reject(error);
}
return Promise.reject(error);
});
}
return this.connection.command(command, payload, expiresIn, callback); return this.connection.command(command, payload, expiresIn, callback);
} }
} }

View File

@ -1,261 +1,137 @@
import { EventEmitter } from "node:events";
import { WebSocket } from "ws";
import { CodeError } from "../common/codeerror";
import { Command, parseCommand, stringifyCommand } from "../common/message";
import { Status } from "../common/status";
import { IdManager } from "./ids"; import { IdManager } from "./ids";
import { Queue, QueueItem } from "./queue"; import { Queue } from "./queue";
type Command = { export type LatencyPayload = {
id?: number;
command: string;
payload?: any;
};
type LatencyPayload = {
/** Round trip time in milliseconds. */ /** Round trip time in milliseconds. */
latency: number; latency: number;
}; };
export declare interface Connection extends EventTarget { export class Connection extends EventEmitter {
addEventListener( socket: WebSocket | null = null;
type: "message",
listener: (ev: CustomEvent) => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a connection is made. */
addEventListener(
type: "connection",
listener: () => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a connection is made. */
addEventListener(
type: "connected",
listener: () => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a connection is made. */
addEventListener(
type: "connect",
listener: () => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a connection is closed. */
addEventListener(
type: "close",
listener: () => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a connection is closed. */
addEventListener(
type: "closed",
listener: () => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a connection is closed. */
addEventListener(
type: "disconnect",
listener: () => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a connection is closed. */
addEventListener(
type: "disconnected",
listener: () => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a reconnect event is successful. */
addEventListener(
type: "reconnect",
listener: () => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a reconnect fails after @see KeepAliveClientOptions.maxReconnectAttempts attempts. */
addEventListener(
type: "reconnectfailed",
listener: () => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a ping message is received from @see KeepAliveServer from `@prsm/keepalive-ws/server`. */
addEventListener(
type: "ping",
listener: (ev: CustomEventInit<{}>) => any,
options?: boolean | AddEventListenerOptions,
): void;
/** Emits when a latency event is received from @see KeepAliveServer from `@prsm/keepalive-ws/server`. */
addEventListener(
type: "latency",
listener: (ev: CustomEventInit<LatencyPayload>) => any,
options?: boolean | AddEventListenerOptions,
): void;
addEventListener(
type: string,
listener: (ev: CustomEvent) => any,
options?: boolean | AddEventListenerOptions,
): void;
}
export class Connection extends EventTarget {
socket: WebSocket;
ids = new IdManager(); ids = new IdManager();
queue = new Queue(); queue = new Queue();
callbacks: { [id: number]: (error: Error | null, result?: any) => void } = {}; callbacks: { [id: number]: (result: any, error?: Error) => void } = {};
status: Status = Status.OFFLINE;
constructor(socket: WebSocket) { constructor(socket: WebSocket | null) {
super(); super();
this.socket = socket; this.socket = socket;
if (socket) {
this.applyListeners(); this.applyListeners();
} }
/**
* Adds an event listener to the target.
* @param event The name of the event to listen for.
* @param listener The function to call when the event is fired.
* @param options An options object that specifies characteristics about the event listener.
*/
on(
event: string,
listener: (ev: CustomEvent) => any,
options?: boolean | AddEventListenerOptions,
) {
this.addEventListener(event, listener, options);
} }
/** get isDead(): boolean {
* Removes the event listener previously registered with addEventListener. return !this.socket || this.socket.readyState !== WebSocket.OPEN;
* @param event A string that specifies the name of the event for which to remove an event listener.
* @param listener The event listener to be removed.
* @param options An options object that specifies characteristics about the event listener.
*/
off(
event: string,
listener: (ev: CustomEvent) => any,
options?: boolean | AddEventListenerOptions,
) {
this.removeEventListener(event, listener, options);
} }
sendToken(cmd: Command, expiresIn: number) { send(command: Command): boolean {
try { try {
this.socket.send(JSON.stringify(cmd)); if (!this.isDead) {
this.socket.send(stringifyCommand(command));
return true;
}
return false;
} catch (e) { } catch (e) {
this.queue.add(cmd, expiresIn); return false;
} }
} }
applyListeners(reconnection = false) { sendWithQueue(command: Command, expiresIn: number): boolean {
const success = this.send(command);
if (!success) {
this.queue.add(command, expiresIn);
}
return success;
}
applyListeners(reconnection = false): void {
if (!this.socket) return;
const drainQueue = () => { const drainQueue = () => {
while (!this.queue.isEmpty) { while (!this.queue.isEmpty) {
const item = this.queue.pop() as QueueItem; const item = this.queue.pop();
this.sendToken(item.value, item.expiresIn); if (item) {
this.send(item.value);
}
} }
}; };
if (reconnection) drainQueue(); if (reconnection) {
// @ts-ignore
this.socket.onopen = (socket: WebSocket, ev: Event): any => {
drainQueue(); drainQueue();
this.dispatchEvent(new Event("connection")); }
this.dispatchEvent(new Event("connected"));
this.dispatchEvent(new Event("connect")); this.socket.onclose = () => {
this.status = Status.OFFLINE;
this.emit("close");
this.emit("disconnect");
}; };
this.socket.onclose = (event: CloseEvent) => { this.socket.onerror = (error) => {
this.dispatchEvent(new Event("close")); this.emit("error", error);
this.dispatchEvent(new Event("closed"));
this.dispatchEvent(new Event("disconnected"));
this.dispatchEvent(new Event("disconnect"));
}; };
this.socket.onmessage = async (event: MessageEvent) => { this.socket.onmessage = (event: any) => {
try { try {
const data = JSON.parse(event.data); const data = parseCommand(event.data as string);
this.dispatchEvent(new CustomEvent("message", { detail: data })); // Emit the raw message event
this.emit("message", data);
// Handle special system commands
if (data.command === "latency:request") { if (data.command === "latency:request") {
this.dispatchEvent( this.emit("latency:request", data.payload);
new CustomEvent<LatencyPayload>("latency:request", { this.command("latency:response", data.payload, null);
detail: { latency: data.payload.latency ?? undefined },
}),
);
this.command(
"latency:response",
{ latency: data.payload.latency ?? undefined },
null,
);
} else if (data.command === "latency") { } else if (data.command === "latency") {
this.dispatchEvent( this.emit("latency", data.payload);
new CustomEvent<LatencyPayload>("latency", {
detail: { latency: data.payload ?? undefined },
}),
);
} else if (data.command === "ping") { } else if (data.command === "ping") {
this.dispatchEvent(new CustomEvent("ping", {})); this.emit("ping");
this.command("pong", {}, null); this.command("pong", {}, null);
} else { } else {
this.dispatchEvent( // Emit command-specific event
new CustomEvent(data.command, { detail: data.payload }), this.emit(data.command, data.payload);
);
} }
if (this.callbacks[data.id]) { // Resolve any pending command promises
this.callbacks[data.id](null, data.payload); if (data.id !== undefined && this.callbacks[data.id]) {
// Always resolve with the payload, even if it contains an error
// This allows the test to check for error properties in the result
this.callbacks[data.id](data.payload);
} }
} catch (e) { } catch (error) {
this.dispatchEvent(new Event("error")); this.emit("error", error);
} }
}; };
} }
async command( command(
command: string, command: string,
payload: any, payload: any,
expiresIn: number = 30_000, expiresIn: number | null = 30_000,
callback: Function | null = null, callback?: (result: any, error?: Error) => void,
) { ): Promise<any> | null {
const id = this.ids.reserve(); const id = this.ids.reserve();
const cmd = { id, command, payload: payload ?? {} }; const cmd: Command = { id, command, payload: payload ?? {} };
this.sendToken(cmd, expiresIn); this.sendWithQueue(cmd, expiresIn || 30000);
if (expiresIn === null) { if (expiresIn === null) {
this.ids.release(id); this.ids.release(id);
delete this.callbacks[id];
return null; return null;
} }
const response = this.createResponsePromise(id); const responsePromise = new Promise<any>((resolve, reject) => {
const timeout = this.createTimeoutPromise(id, expiresIn); this.callbacks[id] = (result: any, error?: Error) => {
if (typeof callback === "function") {
const ret = await Promise.race([response, timeout]);
callback(ret);
return ret;
} else {
return Promise.race([response, timeout]);
}
}
createTimeoutPromise(id: number, expiresIn: number) {
return new Promise((_, reject) => {
setTimeout(() => {
this.ids.release(id); this.ids.release(id);
delete this.callbacks[id]; delete this.callbacks[id];
reject(new Error(`Command ${id} timed out after ${expiresIn}ms.`));
}, expiresIn);
});
}
createResponsePromise(id: number) {
return new Promise((resolve, reject) => {
this.callbacks[id] = (error: Error | null, result?: any) => {
this.ids.release(id);
delete this.callbacks[id];
if (error) { if (error) {
reject(error); reject(error);
} else { } else {
@ -263,5 +139,42 @@ export class Connection extends EventTarget {
} }
}; };
}); });
const timeoutPromise = new Promise<any>((_, reject) => {
setTimeout(() => {
if (this.callbacks[id]) {
this.ids.release(id);
delete this.callbacks[id];
reject(
new CodeError(
`Command timed out after ${expiresIn}ms.`,
"ETIMEOUT",
"TimeoutError",
),
);
}
}, expiresIn);
});
if (typeof callback === "function") {
Promise.race([responsePromise, timeoutPromise])
.then((result) => callback(result))
.catch((error) => callback(null, error));
return responsePromise;
}
return Promise.race([responsePromise, timeoutPromise]);
}
close(): boolean {
if (this.isDead) return false;
try {
this.socket.close();
return true;
} catch (e) {
return false;
}
} }
} }

View File

@ -1,2 +1,3 @@
export { KeepAliveClient } from "./client"; export { KeepAliveClient, Status } from "./client";
export { Connection } from "./connection"; export { Connection } from "./connection";
export { CodeError } from "../common/codeerror";

View File

@ -1,50 +1,48 @@
import { Command } from "../common/message";
export class QueueItem { export class QueueItem {
value: any; value: Command;
expireTime: number; private expiration: number;
constructor(value: any, expiresIn: number) { constructor(value: Command, expiresIn: number) {
this.value = value; this.value = value;
this.expireTime = Date.now() + expiresIn; this.expiration = Date.now() + expiresIn;
} }
get expiresIn() { get expiresIn(): number {
return this.expireTime - Date.now(); return this.expiration - Date.now();
} }
get isExpired() { get isExpired(): boolean {
return Date.now() > this.expireTime; return Date.now() > this.expiration;
} }
} }
export class Queue { export class Queue {
items: any[] = []; private items: QueueItem[] = [];
add(item: any, expiresIn: number) { add(item: Command, expiresIn: number): void {
this.items.push(new QueueItem(item, expiresIn)); this.items.push(new QueueItem(item, expiresIn));
} }
get isEmpty() { get isEmpty(): boolean {
let i = this.items.length; // Remove expired items first
this.items = this.items.filter((item) => !item.isExpired);
while (i--) { return this.items.length === 0;
if (this.items[i].isExpired) {
this.items.splice(i, 1);
} else {
return false;
}
}
return true;
} }
pop(): QueueItem | null { pop(): QueueItem | null {
while (this.items.length) { // Find the first non-expired item
const item = this.items.shift() as QueueItem; while (this.items.length > 0) {
if (!item.isExpired) { const item = this.items.shift();
if (item && !item.isExpired) {
return item; return item;
} }
} }
return null; return null;
} }
clear(): void {
this.items = [];
}
} }

View File

@ -0,0 +1,14 @@
export class CodeError extends Error {
code: string;
name: string;
constructor(message: string, code?: string, name?: string) {
super(message);
if (typeof code === "string") {
this.code = code;
}
if (typeof name === "string") {
this.name = name;
}
}
}

View File

@ -0,0 +1,17 @@
export interface Command {
id?: number;
command: string;
payload: any;
}
export function parseCommand(data: string): Command {
try {
return JSON.parse(data) as Command;
} catch (e) {
return { command: "", payload: {} };
}
}
export function stringifyCommand(command: Command): string {
return JSON.stringify(command);
}

View File

@ -0,0 +1,6 @@
export enum Status {
ONLINE = 3,
CONNECTING = 2,
RECONNECTING = 1,
OFFLINE = 0,
}

View File

@ -1,2 +1,3 @@
export { KeepAliveClient } from "./client"; export { KeepAliveClient, Status } from "./client";
export { KeepAliveServer } from "./server"; export { KeepAliveServer, WSContext } from "./server";
export { CodeError } from "./common/codeerror";

View File

@ -1,19 +0,0 @@
export interface Command {
id?: number;
command: string;
payload: any;
}
export const bufferToCommand = (buffer: Buffer): Command => {
const decoded = new TextDecoder("utf-8").decode(buffer);
if (!decoded) {
return { id: 0, command: "", payload: {} };
}
try {
const parsed = JSON.parse(decoded) as Command;
return { id: parsed.id, command: parsed.command, payload: parsed.payload };
} catch (e) {
return { id: 0, command: "", payload: {} };
}
};

View File

@ -1,10 +1,11 @@
import EventEmitter from "node:events"; import { EventEmitter } from "node:events";
import { IncomingMessage } from "node:http"; import { IncomingMessage } from "node:http";
import { WebSocket } from "ws"; import { WebSocket } from "ws";
import { KeepAliveServerOptions } from "."; import { Command, parseCommand, stringifyCommand } from "../common/message";
import { bufferToCommand, Command } from "./command"; import { Status } from "../common/status";
import { Latency } from "./latency"; import { Latency } from "./latency";
import { Ping } from "./ping"; import { Ping } from "./ping";
import { KeepAliveServerOptions } from "./";
export class Connection extends EventEmitter { export class Connection extends EventEmitter {
id: string; id: string;
@ -14,6 +15,7 @@ export class Connection extends EventEmitter {
ping: Ping; ping: Ping;
remoteAddress: string; remoteAddress: string;
connectionOptions: KeepAliveServerOptions; connectionOptions: KeepAliveServerOptions;
status: Status = Status.ONLINE;
constructor( constructor(
socket: WebSocket, socket: WebSocket,
@ -30,7 +32,11 @@ export class Connection extends EventEmitter {
this.startIntervals(); this.startIntervals();
} }
startIntervals() { get isDead(): boolean {
return !this.socket || this.socket.readyState !== WebSocket.OPEN;
}
startIntervals(): void {
this.latency = new Latency(); this.latency = new Latency();
this.ping = new Ping(); this.ping = new Ping();
@ -50,6 +56,7 @@ export class Connection extends EventEmitter {
this.ping.interval = setInterval(() => { this.ping.interval = setInterval(() => {
if (!this.alive) { if (!this.alive) {
this.emit("close"); this.emit("close");
return;
} }
this.alive = false; this.alive = false;
@ -57,18 +64,24 @@ export class Connection extends EventEmitter {
}, this.connectionOptions.pingInterval); }, this.connectionOptions.pingInterval);
} }
stopIntervals() { stopIntervals(): void {
clearInterval(this.latency.interval); clearInterval(this.latency.interval);
clearInterval(this.ping.interval); clearInterval(this.ping.interval);
} }
applyListeners() { applyListeners(): void {
this.socket.on("close", () => { this.socket.on("close", () => {
this.status = Status.OFFLINE;
this.emit("close"); this.emit("close");
}); });
this.socket.on("message", (buffer: Buffer) => { this.socket.on("error", (error) => {
const command = bufferToCommand(buffer); this.emit("error", error);
});
this.socket.on("message", (data: Buffer) => {
try {
const command = parseCommand(data.toString());
if (command.command === "latency:response") { if (command.command === "latency:response") {
this.latency.onResponse(); this.latency.onResponse();
@ -78,11 +91,34 @@ export class Connection extends EventEmitter {
return; return;
} }
this.emit("message", buffer); this.emit("message", data);
} catch (error) {
this.emit("error", error);
}
}); });
} }
send(cmd: Command) { send(cmd: Command): boolean {
this.socket.send(JSON.stringify(cmd)); if (this.isDead) return false;
try {
this.socket.send(stringifyCommand(cmd));
return true;
} catch (error) {
this.emit("error", error);
return false;
}
}
close(): boolean {
if (this.isDead) return false;
try {
this.socket.close();
return true;
} catch (error) {
this.emit("error", error);
return false;
}
} }
} }

View File

@ -1,117 +1,26 @@
import { IncomingMessage } from "node:http"; import { IncomingMessage } from "node:http";
import { ServerOptions, WebSocket, WebSocketServer } from "ws"; import { ServerOptions, WebSocket, WebSocketServer } from "ws";
import { bufferToCommand } from "./command"; import { CodeError } from "../common/codeerror";
import { Command, parseCommand } from "../common/message";
import { Status } from "../common/status";
import { Connection } from "./connection"; import { Connection } from "./connection";
export declare interface KeepAliveServer extends WebSocketServer { export { Status } from "../common/status";
on( export { Connection } from "./connection";
event: "connection",
handler: (socket: WebSocket, req: IncomingMessage) => void,
): this;
on(event: "connected", handler: (c: Connection) => void): this;
on(event: "close", handler: (c: Connection) => void): this;
on(event: "error", cb: (this: WebSocketServer, error: Error) => void): this;
on(
event: "headers",
cb: (
this: WebSocketServer,
headers: string[],
request: IncomingMessage,
) => void,
): this;
on(
event: string | symbol,
listener: (this: WebSocketServer, ...args: any[]) => void,
): this;
emit(event: "connection", socket: WebSocket, req: IncomingMessage): boolean; export class WSContext<T = any> {
emit(event: "connected", connection: Connection): boolean; server: KeepAliveServer;
emit(event: "close", connection: Connection): boolean;
emit(event: "error", connection: Connection): boolean;
once(
event: "connection",
cb: (
this: WebSocketServer,
socket: WebSocket,
request: IncomingMessage,
) => void,
): this;
once(event: "error", cb: (this: WebSocketServer, error: Error) => void): this;
once(
event: "headers",
cb: (
this: WebSocketServer,
headers: string[],
request: IncomingMessage,
) => void,
): this;
once(event: "close" | "listening", cb: (this: WebSocketServer) => void): this;
once(
event: string | symbol,
listener: (this: WebSocketServer, ...args: any[]) => void,
): this;
off(
event: "connection",
cb: (
this: WebSocketServer,
socket: WebSocket,
request: IncomingMessage,
) => void,
): this;
off(event: "error", cb: (this: WebSocketServer, error: Error) => void): this;
off(
event: "headers",
cb: (
this: WebSocketServer,
headers: string[],
request: IncomingMessage,
) => void,
): this;
off(event: "close" | "listening", cb: (this: WebSocketServer) => void): this;
off(
event: string | symbol,
listener: (this: WebSocketServer, ...args: any[]) => void,
): this;
addListener(
event: "connection",
cb: (client: WebSocket, request: IncomingMessage) => void,
): this;
addListener(event: "error", cb: (err: Error) => void): this;
addListener(
event: "headers",
cb: (headers: string[], request: IncomingMessage) => void,
): this;
addListener(event: "close" | "listening", cb: () => void): this;
addListener(event: string | symbol, listener: (...args: any[]) => void): this;
removeListener(event: "connection", cb: (client: WebSocket) => void): this;
removeListener(event: "error", cb: (err: Error) => void): this;
removeListener(
event: "headers",
cb: (headers: string[], request: IncomingMessage) => void,
): this;
removeListener(event: "close" | "listening", cb: () => void): this;
removeListener(
event: string | symbol,
listener: (...args: any[]) => void,
): this;
}
export class WSContext<T> {
wss: KeepAliveServer;
connection: Connection; connection: Connection;
payload: T; payload: T;
constructor(wss: KeepAliveServer, connection: Connection, payload: any) { constructor(server: KeepAliveServer, connection: Connection, payload: T) {
this.wss = wss; this.server = server;
this.connection = connection; this.connection = connection;
this.payload = payload; this.payload = payload;
} }
} }
export type SocketMiddleware = (c: WSContext<any>) => any | Promise<any>; export type SocketMiddleware = (context: WSContext<any>) => any | Promise<any>;
export type KeepAliveServerOptions = ServerOptions & { export type KeepAliveServerOptions = ServerOptions & {
/** /**
@ -136,34 +45,65 @@ export class KeepAliveServer extends WebSocketServer {
globalMiddlewares: SocketMiddleware[] = []; globalMiddlewares: SocketMiddleware[] = [];
middlewares: { [key: string]: SocketMiddleware[] } = {}; middlewares: { [key: string]: SocketMiddleware[] } = {};
rooms: { [roomName: string]: Set<string> } = {}; rooms: { [roomName: string]: Set<string> } = {};
declare serverOptions: KeepAliveServerOptions; serverOptions: ServerOptions & {
pingInterval: number;
latencyInterval: number;
};
status: Status = Status.OFFLINE;
private _listening: boolean = false;
/**
* Whether the server is currently listening for connections
*/
get listening(): boolean {
return this._listening;
}
constructor(opts: KeepAliveServerOptions) { constructor(opts: KeepAliveServerOptions) {
super({ ...opts }); super(opts);
this.serverOptions = { this.serverOptions = {
...opts, ...opts,
pingInterval: opts.pingInterval ?? 30_000, pingInterval: opts.pingInterval ?? 30_000,
latencyInterval: opts.latencyInterval ?? 5_000, latencyInterval: opts.latencyInterval ?? 5_000,
}; };
this.on("listening", () => {
this._listening = true;
this.status = Status.ONLINE;
});
this.on("close", () => {
this._listening = false;
this.status = Status.OFFLINE;
});
this.applyListeners(); this.applyListeners();
} }
private cleanupConnection(c: Connection) { private cleanupConnection(connection: Connection): void {
c.stopIntervals(); connection.stopIntervals();
delete this.connections[c.id]; delete this.connections[connection.id];
if (this.remoteAddressToConnections[c.remoteAddress]) {
this.remoteAddressToConnections[c.remoteAddress] = if (this.remoteAddressToConnections[connection.remoteAddress]) {
this.remoteAddressToConnections[c.remoteAddress].filter( this.remoteAddressToConnections[connection.remoteAddress] =
(cn) => cn.id !== c.id, this.remoteAddressToConnections[connection.remoteAddress].filter(
(conn) => conn.id !== connection.id,
); );
}
if (!this.remoteAddressToConnections[c.remoteAddress].length) { if (
delete this.remoteAddressToConnections[c.remoteAddress]; this.remoteAddressToConnections[connection.remoteAddress].length === 0
) {
delete this.remoteAddressToConnections[connection.remoteAddress];
} }
} }
private applyListeners() { // Remove from all rooms
Object.keys(this.rooms).forEach((roomName) => {
this.rooms[roomName].delete(connection.id);
});
}
private applyListeners(): void {
this.on("connection", (socket: WebSocket, req: IncomingMessage) => { this.on("connection", (socket: WebSocket, req: IncomingMessage) => {
const connection = new Connection(socket, req, this.serverOptions); const connection = new Connection(socket, req, this.serverOptions);
this.connections[connection.id] = connection; this.connections[connection.id] = connection;
@ -178,44 +118,47 @@ export class KeepAliveServer extends WebSocketServer {
this.emit("connected", connection); this.emit("connected", connection);
connection.once("close", () => { connection.on("close", () => {
this.cleanupConnection(connection); this.cleanupConnection(connection);
this.emit("close", connection); this.emit("close", connection);
if (socket.readyState === WebSocket.OPEN) {
socket.close();
}
Object.keys(this.rooms).forEach((roomName) => {
this.rooms[roomName].delete(connection.id);
}); });
connection.on("error", (error) => {
this.emit("clientError", error);
}); });
connection.on("message", (buffer: Buffer) => { connection.on("message", (buffer: Buffer) => {
try { try {
const { id, command, payload } = bufferToCommand(buffer); const data = buffer.toString();
this.runCommand(id ?? 0, command, payload, connection); const command = parseCommand(data);
} catch (e) {
this.emit("error", e); if (command.id !== undefined) {
this.runCommand(
command.id,
command.command,
command.payload,
connection,
);
}
} catch (error) {
this.emit("error", error);
} }
}); });
}); });
} }
broadcast(command: string, payload: any, connections?: Connection[]) { broadcast(command: string, payload: any, connections?: Connection[]): void {
const cmd = JSON.stringify({ command, payload }); const cmd: Command = { command, payload };
if (connections) { if (connections) {
connections.forEach((c) => { connections.forEach((connection) => {
c.socket.send(cmd); connection.send(cmd);
});
} else {
Object.values(this.connections).forEach((connection) => {
connection.send(cmd);
}); });
return;
} }
Object.values(this.connections).forEach((c) => {
c.socket.send(cmd);
});
} }
/** /**
@ -226,14 +169,21 @@ export class KeepAliveServer extends WebSocketServer {
* - Push notifications. * - Push notifications.
* - Auth changes, e.g., logging out in one tab should log you out in all tabs. * - Auth changes, e.g., logging out in one tab should log you out in all tabs.
*/ */
broadcastRemoteAddress(c: Connection, command: string, payload: any) { broadcastRemoteAddress(
const cmd = JSON.stringify({ command, payload }); connection: Connection,
this.remoteAddressToConnections[c.remoteAddress].forEach((cn) => { command: string,
cn.socket.send(cmd); payload: any,
): void {
const cmd: Command = { command, payload };
const connections =
this.remoteAddressToConnections[connection.remoteAddress] || [];
connections.forEach((conn) => {
conn.send(cmd);
}); });
} }
broadcastRemoteAddressById(id: string, command: string, payload: any) { broadcastRemoteAddressById(id: string, command: string, payload: any): void {
const connection = this.connections[id]; const connection = this.connections[id];
if (connection) { if (connection) {
this.broadcastRemoteAddress(connection, command, payload); this.broadcastRemoteAddress(connection, command, payload);
@ -244,8 +194,8 @@ export class KeepAliveServer extends WebSocketServer {
* Given a roomName, a command and a payload, broadcasts to all Connections * Given a roomName, a command and a payload, broadcasts to all Connections
* that are in the room. * that are in the room.
*/ */
broadcastRoom(roomName: string, command: string, payload: any) { broadcastRoom(roomName: string, command: string, payload: any): void {
const cmd = JSON.stringify({ command, payload }); const cmd: Command = { command, payload };
const room = this.rooms[roomName]; const room = this.rooms[roomName];
if (!room) return; if (!room) return;
@ -253,7 +203,7 @@ export class KeepAliveServer extends WebSocketServer {
room.forEach((connectionId) => { room.forEach((connectionId) => {
const connection = this.connections[connectionId]; const connection = this.connections[connectionId];
if (connection) { if (connection) {
connection.socket.send(cmd); connection.send(cmd);
} }
}); });
} }
@ -267,8 +217,8 @@ export class KeepAliveServer extends WebSocketServer {
command: string, command: string,
payload: any, payload: any,
connection: Connection | Connection[], connection: Connection | Connection[],
) { ): void {
const cmd = JSON.stringify({ command, payload }); const cmd: Command = { command, payload };
const room = this.rooms[roomName]; const room = this.rooms[roomName];
if (!room) return; if (!room) return;
@ -281,7 +231,7 @@ export class KeepAliveServer extends WebSocketServer {
if (!excludeIds.includes(connectionId)) { if (!excludeIds.includes(connectionId)) {
const conn = this.connections[connectionId]; const conn = this.connections[connectionId];
if (conn) { if (conn) {
conn.socket.send(cmd); conn.send(cmd);
} }
} }
}); });
@ -291,111 +241,157 @@ export class KeepAliveServer extends WebSocketServer {
* Given a connection, broadcasts a message to all connections except * Given a connection, broadcasts a message to all connections except
* the provided connection. * the provided connection.
*/ */
broadcastExclude(connection: Connection, command: string, payload: any) { broadcastExclude(
const cmd = JSON.stringify({ command, payload }); connection: Connection,
Object.values(this.connections).forEach((c) => { command: string,
if (c.id !== connection.id) { payload: any,
c.socket.send(cmd); ): void {
const cmd: Command = { command, payload };
Object.values(this.connections).forEach((conn) => {
if (conn.id !== connection.id) {
conn.send(cmd);
} }
}); });
} }
/** /**
* @example * Add a connection to a room
* ```typescript
* server.registerCommand("join:room", async (payload: { roomName: string }, connection: Connection) => {
* server.addToRoom(payload.roomName, connection);
* server.broadcastRoom(payload.roomName, "joined", { roomName: payload.roomName });
* });
* ```
*/ */
addToRoom(roomName: string, connection: Connection) { addToRoom(roomName: string, connection: Connection): void {
this.rooms[roomName] = this.rooms[roomName] ?? new Set(); this.rooms[roomName] = this.rooms[roomName] ?? new Set();
this.rooms[roomName].add(connection.id); this.rooms[roomName].add(connection.id);
} }
removeFromRoom(roomName: string, connection: Connection) { /**
* Remove a connection from a room
*/
removeFromRoom(roomName: string, connection: Connection): void {
if (!this.rooms[roomName]) return; if (!this.rooms[roomName]) return;
this.rooms[roomName].delete(connection.id); this.rooms[roomName].delete(connection.id);
} }
removeFromAllRooms(connection: Connection | string) { /**
const connectionId = typeof connection === "string" ? connection : connection.id; * Remove a connection from all rooms
*/
removeFromAllRooms(connection: Connection | string): void {
const connectionId =
typeof connection === "string" ? connection : connection.id;
Object.keys(this.rooms).forEach((roomName) => { Object.keys(this.rooms).forEach((roomName) => {
this.rooms[roomName].delete(connectionId); this.rooms[roomName].delete(connectionId);
}); });
} }
/** /**
* Returns a "room", which is simply a Set of Connection ids. * Returns all connections in a room
* @param roomName
*/ */
getRoom(roomName: string): Connection[] { getRoom(roomName: string): Connection[] {
const ids = this.rooms[roomName] || new Set(); const ids = this.rooms[roomName] || new Set();
return Array.from(ids).map((id) => this.connections[id]); return Array.from(ids)
.map((id) => this.connections[id])
.filter(Boolean);
} }
clearRoom(roomName: string) { /**
* Clear all connections from a room
*/
clearRoom(roomName: string): void {
this.rooms[roomName] = new Set(); this.rooms[roomName] = new Set();
} }
registerCommand<T>( /**
* Register a command handler
*/
async registerCommand<T = any>(
command: string, command: string,
callback: (context: WSContext<any>) => Promise<T> | T, callback: (context: WSContext<any>) => Promise<T> | T,
middlewares: SocketMiddleware[] = [], middlewares: SocketMiddleware[] = [],
) { ): Promise<void> {
this.commands[command] = callback; this.commands[command] = callback;
if (middlewares.length > 0) {
this.prependMiddlewareToCommand(command, middlewares); this.prependMiddlewareToCommand(command, middlewares);
} }
prependMiddlewareToCommand(command: string, middlewares: SocketMiddleware[]) { return Promise.resolve();
}
/**
* Add middleware to be executed before a command
*/
prependMiddlewareToCommand(
command: string,
middlewares: SocketMiddleware[],
): void {
if (middlewares.length) { if (middlewares.length) {
this.middlewares[command] = this.middlewares[command] || []; this.middlewares[command] = this.middlewares[command] || [];
this.middlewares[command] = middlewares.concat(this.middlewares[command]); this.middlewares[command] = middlewares.concat(this.middlewares[command]);
} }
} }
appendMiddlewareToCommand(command: string, middlewares: SocketMiddleware[]) { /**
* Add middleware to be executed after other middleware but before the command
*/
appendMiddlewareToCommand(
command: string,
middlewares: SocketMiddleware[],
): void {
if (middlewares.length) { if (middlewares.length) {
this.middlewares[command] = this.middlewares[command] || []; this.middlewares[command] = this.middlewares[command] || [];
this.middlewares[command] = this.middlewares[command].concat(middlewares); this.middlewares[command] = this.middlewares[command].concat(middlewares);
} }
} }
/**
* Execute a command with the given id, name, payload and connection
*/
private async runCommand( private async runCommand(
id: number, id: number,
command: string, command: string,
payload: any, payload: any,
connection: Connection, connection: Connection,
) { ): Promise<void> {
const c = new WSContext(this, connection, payload); const context = new WSContext(this, connection, payload);
try { try {
if (!this.commands[command]) { if (!this.commands[command]) {
// An onslaught of commands that don't exist is a sign of a bad throw new CodeError(
// or otherwise misconfigured client. `Command [${command}] not found.`,
throw new Error(`Command [${command}] not found.`); "ENOTFOUND",
"CommandError",
);
} }
// Run global middlewares
if (this.globalMiddlewares.length) { if (this.globalMiddlewares.length) {
for (const mw of this.globalMiddlewares) { for (const middleware of this.globalMiddlewares) {
await mw(c); await middleware(context);
} }
} }
// Run command-specific middlewares
if (this.middlewares[command]) { if (this.middlewares[command]) {
for (const mw of this.middlewares[command]) { for (const middleware of this.middlewares[command]) {
await mw(c); await middleware(context);
} }
} }
const result = await this.commands[command](c); // Execute the command
const result = await this.commands[command](context);
connection.send({ id, command, payload: result }); connection.send({ id, command, payload: result });
} catch (e) { } catch (error) {
const payload = { error: e.message ?? e ?? "Unknown error" }; // Handle and serialize errors
connection.send({ id, command, payload }); const errorPayload =
} error instanceof Error
} ? {
error: error.message,
code: (error as CodeError).code || "ESERVER",
name: error.name || "Error",
} }
: { error: String(error) };
export { Connection }; connection.send({ id, command, payload: errorPayload });
}
}
}

View File

@ -0,0 +1,161 @@
import { describe, test, expect, beforeEach, afterEach } from "vitest";
import { KeepAliveClient, Status } from "../src/client/client";
import { KeepAliveServer } from "../src/server/index";
// Helper to create a WebSocket server for testing
const createTestServer = (port: number) => {
return new KeepAliveServer({
port,
pingInterval: 1000, // Faster for testing
latencyInterval: 500, // Faster for testing
});
};
describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => {
const port = 8125;
let server: KeepAliveServer;
let client: KeepAliveClient;
beforeEach(async () => {
server = createTestServer(port);
// Wait for the server to start
await new Promise<void>((resolve) => {
server.on("listening", () => {
resolve();
});
// In case the server is already listening
if (server.listening) {
resolve();
}
});
client = new KeepAliveClient(`ws://localhost:${port}`);
});
afterEach(async () => {
// Close connections in order
if (client.status === Status.ONLINE) {
await client.close();
}
// Close the server
return new Promise<void>((resolve) => {
if (server) {
server.close(() => {
resolve();
});
} else {
resolve();
}
});
});
test("command times out when server doesn't respond", async () => {
await server.registerCommand("never-responds", async () => {
return new Promise(() => {});
});
await client.connect();
// Expect it to fail after a short timeout
await expect(
client.command("never-responds", "Should timeout", 500),
).rejects.toThrow(/timed out/);
}, 2000);
test("server errors are properly serialized to client", async () => {
await server.registerCommand("throws-error", async () => {
throw new Error("Custom server error");
});
await client.connect();
// Expect to receive this error
const result = await client.command("throws-error", "Will error", 1000);
expect(result).toHaveProperty("error", "Custom server error");
}, 2000);
test("multiple concurrent commands are handled correctly", async () => {
// Register commands with different delays
await server.registerCommand("fast", async (context) => {
await new Promise((r) => setTimeout(r, 50));
return `Fast: ${context.payload}`;
});
await server.registerCommand("slow", async (context) => {
await new Promise((r) => setTimeout(r, 150));
return `Slow: ${context.payload}`;
});
await server.registerCommand("echo", async (context) => {
return `Echo: ${context.payload}`;
});
await client.connect();
// Send multiple commands concurrently
const results = await Promise.all([
client.command("fast", "First", 1000),
client.command("slow", "Second", 1000),
client.command("echo", "Third", 1000),
]);
// Verify all commands completed successfully
expect(results).toEqual(["Fast: First", "Slow: Second", "Echo: Third"]);
}, 3000);
test("handles large payloads correctly", async () => {
await server.registerCommand("echo", async (context) => {
return context.payload;
});
await client.connect();
const largeData = {
array: Array(1000)
.fill(0)
.map((_, i) => `item-${i}`),
nested: {
deep: {
object: {
with: "lots of data",
},
},
},
};
const result = await client.command("echo", largeData, 5000);
// Verify the response contains the expected data
expect(result).toEqual(largeData);
}, 10000);
test("server handles multiple client connections", async () => {
await server.registerCommand("echo", async (context) => {
return `Echo: ${context.payload}`;
});
// Create multiple clients
const clients = Array(5)
.fill(0)
.map(() => new KeepAliveClient(`ws://localhost:${port}`));
// Connect all clients
await Promise.all(clients.map((client) => client.connect()));
// Send a command from each client
const results = await Promise.all(
clients.map((client, i) => client.command("echo", `Client ${i}`, 1000)),
);
// Verify all commands succeeded
results.forEach((result, i) => {
expect(result).toBe(`Echo: Client ${i}`);
});
// Clean up
await Promise.all(clients.map((client) => client.close()));
}, 5000);
});

View File

@ -0,0 +1,97 @@
import { describe, test, expect, beforeEach, afterEach } from "vitest";
import { KeepAliveClient, Status } from "../src/client/client";
import { KeepAliveServer } from "../src/server/index";
import { WebSocket, WebSocketServer } from "ws";
// Helper to create a WebSocket server for testing
const createTestServer = (port: number) => {
return new KeepAliveServer({
port,
pingInterval: 1000, // Faster for testing
latencyInterval: 500, // Faster for testing
});
};
describe("Basic KeepAliveClient and KeepAliveServer Tests", () => {
const port = 8124;
let server: KeepAliveServer;
let client: KeepAliveClient;
beforeEach(async () => {
server = createTestServer(port);
// Wait for the server to start
await new Promise<void>((resolve) => {
server.on("listening", () => {
resolve();
});
// In case the server is already listening
if (server.listening) {
resolve();
}
});
client = new KeepAliveClient(`ws://localhost:${port}`);
});
afterEach(async () => {
// Close connections in order
if (client.status === Status.ONLINE) {
await client.close();
}
// Close the server
return new Promise<void>((resolve) => {
if (server) {
server.close(() => {
resolve();
});
} else {
resolve();
}
});
});
test("client-server connection should be online", async () => {
await server.registerCommand("echo", async (context) => {
return context.payload;
});
await client.connect();
expect(client.status).toBe(Status.ONLINE);
}, 10000);
test("simple echo command", async () => {
await server.registerCommand("echo", async (context) => {
return `Echo: ${context.payload}`;
});
await client.connect();
const result = await client.command("echo", "Hello", 5000);
expect(result).toBe("Echo: Hello");
}, 10000);
test("connect should resolve when already connected", async () => {
await server.registerCommand("echo", async (context) => {
return context.payload;
});
await client.connect();
expect(client.status).toBe(Status.ONLINE);
// Second connect should resolve immediately
await client.connect();
expect(client.status).toBe(Status.ONLINE);
}, 10000);
test("close should resolve when already closed", async () => {
await client.close();
expect(client.status).toBe(Status.OFFLINE);
// Second close should resolve immediately
await client.close();
expect(client.status).toBe(Status.OFFLINE);
}, 10000);
});