refactor for maintainability and modularity

This commit is contained in:
nvms 2025-04-20 17:05:32 -04:00
parent 57af00dc40
commit 9fbd947ad1
16 changed files with 2034 additions and 1293 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,212 @@
import type { Connection } from "../connection";
import type { Command } from "../../common/message";
import type { ConnectionManager } from "./connection";
import type { RoomManager } from "./room";
import type Redis from "ioredis";
export class BroadcastManager {
private connectionManager: ConnectionManager;
private roomManager: RoomManager;
private instanceId: string;
private pubClient: Redis;
private getPubSubChannel: (instanceId: string) => string;
private emitError: (error: Error) => void;
constructor(
connectionManager: ConnectionManager,
roomManager: RoomManager,
instanceId: string,
pubClient: any,
getPubSubChannel: (instanceId: string) => string,
emitError: (error: Error) => void
) {
this.connectionManager = connectionManager;
this.roomManager = roomManager;
this.instanceId = instanceId;
this.pubClient = pubClient;
this.getPubSubChannel = getPubSubChannel;
this.emitError = emitError;
}
/**
* Broadcasts a command and payload to a set of connections or all available connections.
*
* @param {string} command - The command to be broadcasted.
* @param {any} payload - The data associated with the command.
* @param {Connection[]=} connections - (Optional) A specific list of connections to broadcast to. If not provided, the command will be sent to all connections.
*
* @throws {Error} Emits an "error" event if broadcasting fails.
*/
async broadcast(command: string, payload: any, connections?: Connection[]) {
const cmd: Command = { command, payload };
try {
if (connections) {
const allConnectionIds = connections.map(({ id }) => id);
const connectionIds =
await this.connectionManager.getAllConnectionIds();
const filteredIds = allConnectionIds.filter((id) =>
connectionIds.includes(id)
);
await this.publishOrSend(filteredIds, cmd);
} else {
const allConnectionIds =
await this.connectionManager.getAllConnectionIds();
await this.publishOrSend(allConnectionIds, cmd);
}
} catch (err) {
this.emitError(
new Error(`Failed to broadcast command "${command}": ${err}`)
);
}
}
/**
* Broadcasts a command and associated payload to all active connections within the specified room.
*
* @param {string} roomName - The name of the room whose connections will receive the broadcast.
* @param {string} command - The command to be broadcasted to the connections.
* @param {unknown} payload - The data payload associated with the command.
* @returns {Promise<void>} A promise that resolves when the broadcast operation is complete.
* @throws {Error} If the broadcast operation fails, an error is thrown and the promise is rejected.
*/
async broadcastRoom(
roomName: string,
command: string,
payload: any
): Promise<void> {
const connectionIds = await this.roomManager.getRoomConnectionIds(roomName);
try {
await this.publishOrSend(connectionIds, { command, payload });
} catch (err) {
this.emitError(
new Error(`Failed to broadcast command "${command}": ${err}`)
);
}
}
/**
* Broadcasts a command and payload to all active connections except for the specified one(s).
* Excludes the provided connection(s) from receiving the broadcast.
*
* @param {string} command - The command to broadcast to connections.
* @param {any} payload - The payload to send along with the command.
* @param {Connection | Connection[]} exclude - A single connection or an array of connections to exclude from the broadcast.
* @returns {Promise<void>} A promise that resolves when the broadcast is complete.
* @emits {Error} Emits an "error" event if broadcasting the command fails.
*/
async broadcastExclude(
command: string,
payload: any,
exclude: Connection | Connection[]
): Promise<void> {
const excludedIds = new Set(
(Array.isArray(exclude) ? exclude : [exclude]).map(({ id }) => id)
);
try {
const connectionIds = (
await this.connectionManager.getAllConnectionIds()
).filter((id: string) => !excludedIds.has(id));
await this.publishOrSend(connectionIds, { command, payload });
} catch (err) {
this.emitError(
new Error(`Failed to broadcast command "${command}": ${err}`)
);
}
}
/**
* Broadcasts a command with a payload to all connections in a specified room,
* excluding one or more given connections. If the broadcast fails, emits an error event.
*
* @param {string} roomName - The name of the room to broadcast to.
* @param {string} command - The command to broadcast.
* @param {any} payload - The payload to send with the command.
* @param {Connection | Connection[]} exclude - A connection or array of connections to exclude from the broadcast.
* @returns {Promise<void>} A promise that resolves when the broadcast is complete.
* @emits {Error} Emits an error event if broadcasting fails.
*/
async broadcastRoomExclude(
roomName: string,
command: string,
payload: any,
exclude: Connection | Connection[]
): Promise<void> {
const excludedIds = new Set(
(Array.isArray(exclude) ? exclude : [exclude]).map(({ id }) => id)
);
try {
const connectionIds = (
await this.roomManager.getRoomConnectionIds(roomName)
).filter((id: string) => !excludedIds.has(id));
await this.publishOrSend(connectionIds, { command, payload });
} catch (err) {
this.emitError(
new Error(`Failed to broadcast command "${command}": ${err}`)
);
}
}
private async publishOrSend(connectionIds: string[], command: Command) {
if (connectionIds.length === 0) {
return;
}
// get instance mapping for the target connection IDs
const connectionInstanceMapping =
await this.connectionManager.getInstanceIdsForConnections(connectionIds);
const instanceMap: { [instanceId: string]: string[] } = {};
// group connection IDs by instance ID
for (const connectionId of connectionIds) {
const instanceId = connectionInstanceMapping[connectionId];
if (instanceId) {
if (!instanceMap[instanceId]) {
instanceMap[instanceId] = [];
}
instanceMap[instanceId].push(connectionId);
}
}
// publish command to each instance
for (const [instanceId, targetConnectionIds] of Object.entries(
instanceMap
)) {
if (targetConnectionIds.length === 0) continue;
if (instanceId === this.instanceId) {
// send locally
targetConnectionIds.forEach((connectionId) => {
const connection =
this.connectionManager.getLocalConnection(connectionId);
if (connection && !connection.isDead) {
connection.send(command);
}
});
} else {
// publish to remote instance via pubsub
const messagePayload = {
targetConnectionIds,
command,
};
const message = JSON.stringify(messagePayload);
try {
await this.pubClient.publish(
this.getPubSubChannel(instanceId),
message
);
} catch (err) {
this.emitError(
new Error(`Failed to publish command "${command.command}": ${err}`)
);
}
}
}
}
}

View File

@ -0,0 +1,200 @@
import type { Redis } from "ioredis";
import type { Connection } from "../connection";
import type { ChannelPattern } from "../types";
export class ChannelManager {
private redis: Redis;
private pubClient: Redis;
private subClient: Redis;
private exposedChannels: ChannelPattern[] = [];
private channelGuards: Map<
ChannelPattern,
(connection: Connection, channel: string) => Promise<boolean> | boolean
> = new Map();
private channelSubscriptions: { [channel: string]: Set<Connection> } = {};
private emitError: (error: Error) => void;
constructor(
redis: Redis,
pubClient: Redis,
subClient: Redis,
emitError: (error: Error) => void
) {
this.redis = redis;
this.pubClient = pubClient;
this.subClient = subClient;
this.emitError = emitError;
}
/**
* Exposes a channel for external access and optionally associates a guard function
* to control access to that channel. The guard function determines whether a given
* connection is permitted to access the channel.
*
* @param {ChannelPattern} channel - The channel or pattern to expose.
* @param {(connection: Connection, channel: string) => Promise<boolean> | boolean} [guard] -
* Optional guard function that receives the connection and channel name, returning
* a boolean or a promise that resolves to a boolean indicating whether access is allowed.
* @returns {void}
*/
exposeChannel(
channel: ChannelPattern,
guard?: (
connection: Connection,
channel: string
) => Promise<boolean> | boolean
): void {
this.exposedChannels.push(channel);
if (guard) {
this.channelGuards.set(channel, guard);
}
}
/**
* Checks if a channel is exposed and if the connection has access to it.
*
* @param channel - The channel to check
* @param connection - The connection requesting access
* @returns A promise that resolves to true if the channel is exposed and the connection has access
*/
async isChannelExposed(
channel: string,
connection: Connection
): Promise<boolean> {
const matchedPattern = this.exposedChannels.find((pattern) =>
typeof pattern === "string" ? pattern === channel : pattern.test(channel)
);
if (!matchedPattern) {
return false;
}
const guard = this.channelGuards.get(matchedPattern);
if (guard) {
try {
return await Promise.resolve(guard(connection, channel));
} catch (e) {
return false;
}
}
return true;
}
/**
* Publishes a message to a specified channel and optionally maintains a history of messages.
*
* @param {string} channel - The name of the channel to which the message will be published.
* @param {any} message - The message to be published. Will not be stringified automatically for you. You need to do that yourself.
* @param {number} [history=0] - The number of historical messages to retain for the channel. Defaults to 0, meaning no history is retained.
* If greater than 0, the message will be added to the channel's history and the history will be trimmed to the specified size.
* @returns {Promise<void>} A Promise that resolves once the message has been published and, if applicable, the history has been updated.
* @throws {Error} This function may throw an error if the underlying `pubClient` operations (e.g., `lpush`, `ltrim`, `publish`) fail.
*/
async publishToChannel(
channel: string,
message: any,
history: number = 0
): Promise<void> {
const parsedHistory = parseInt(history as any, 10);
if (!isNaN(parsedHistory) && parsedHistory > 0) {
await this.pubClient.lpush(`history:${channel}`, message);
await this.pubClient.ltrim(`history:${channel}`, 0, parsedHistory);
}
await this.pubClient.publish(channel, message);
}
/**
* Subscribes a connection to a channel
*
* @param channel - The channel to subscribe to
* @param connection - The connection to subscribe
*/
addSubscription(channel: string, connection: Connection): void {
if (!this.channelSubscriptions[channel]) {
this.channelSubscriptions[channel] = new Set();
}
this.channelSubscriptions[channel].add(connection);
}
/**
* Unsubscribes a connection from a channel
*
* @param channel - The channel to unsubscribe from
* @param connection - The connection to unsubscribe
* @returns true if the connection was subscribed and is now unsubscribed, false otherwise
*/
removeSubscription(channel: string, connection: Connection): boolean {
if (this.channelSubscriptions[channel]) {
this.channelSubscriptions[channel].delete(connection);
if (this.channelSubscriptions[channel].size === 0) {
delete this.channelSubscriptions[channel];
}
return true;
}
return false;
}
/**
* Gets all subscribers for a channel
*
* @param channel - The channel to get subscribers for
* @returns A set of connections subscribed to the channel, or undefined if none
*/
getSubscribers(channel: string): Set<Connection> | undefined {
return this.channelSubscriptions[channel];
}
/**
* Subscribes to a Redis channel
*
* @param channel - The channel to subscribe to
* @returns A promise that resolves when the subscription is complete
*/
async subscribeToRedisChannel(channel: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.subClient.subscribe(channel, (err) => {
if (err) reject(err);
else resolve();
});
});
}
/**
* Unsubscribes from a Redis channel
*
* @param channel - The channel to unsubscribe from
* @returns A promise that resolves when the unsubscription is complete
*/
async unsubscribeFromRedisChannel(channel: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.subClient.unsubscribe(channel, (err) => {
if (err) reject(err);
else resolve();
});
});
}
/**
* Gets channel history from Redis
*
* @param channel - The channel to get history for
* @param limit - The maximum number of history items to retrieve
* @returns A promise that resolves to an array of history items
*/
async getChannelHistory(channel: string, limit: number): Promise<string[]> {
const historyKey = `history:${channel}`;
return this.redis.lrange(historyKey, 0, limit - 1);
}
/**
* Cleans up all subscriptions for a connection
*
* @param connection - The connection to clean up
*/
cleanupConnection(connection: Connection): void {
for (const channel in this.channelSubscriptions) {
this.removeSubscription(channel, connection);
}
}
}

View File

@ -0,0 +1,144 @@
import { CodeError } from "../../client";
import { MeshContext } from "../mesh-context";
import type { Connection } from "../connection";
import type { SocketMiddleware } from "../types";
export class CommandManager {
private commands: {
[command: string]: (context: MeshContext<any>) => Promise<any> | any;
} = {};
private globalMiddlewares: SocketMiddleware[] = [];
private middlewares: { [key: string]: SocketMiddleware[] } = {};
private emitError: (error: Error) => void;
constructor(emitError: (error: Error) => void) {
this.emitError = emitError;
}
/**
* Registers a command with an associated callback and optional middleware.
*
* @template T The type for `MeshContext.payload`. Defaults to `any`.
* @template U The command's return value type. Defaults to `any`.
* @param {string} command - The unique identifier for the command to register.
* @param {(context: MeshContext<T>) => Promise<U> | U} callback - The function to execute when the command is invoked. It receives a `MeshContext` of type `T` and may return a value of type `U` or a `Promise` resolving to `U`.
* @param {SocketMiddleware[]} [middlewares=[]] - An optional array of middleware functions to apply to the command. Defaults to an empty array.
* @throws {Error} May throw an error if the command registration or middleware addition fails.
*/
registerCommand<T = any, U = any>(
command: string,
callback: (context: MeshContext<T>) => Promise<U> | U,
middlewares: SocketMiddleware[] = []
) {
this.commands[command] = callback;
if (middlewares.length > 0) {
this.addMiddlewareToCommand(command, middlewares);
}
}
/**
* Adds one or more middleware functions to the global middleware stack.
*
* @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added. Each middleware
* is expected to conform to the `SocketMiddleware` type.
* @returns {void}
* @throws {Error} If the provided middlewares are not valid or fail validation (if applicable).
*/
addMiddleware(...middlewares: SocketMiddleware[]): void {
this.globalMiddlewares.push(...middlewares);
}
/**
* Adds an array of middleware functions to a specific command.
*
* @param {string} command - The name of the command to associate the middleware with.
* @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added to the command.
* @returns {void}
*/
addMiddlewareToCommand(
command: string,
middlewares: SocketMiddleware[]
): void {
if (middlewares.length) {
this.middlewares[command] = this.middlewares[command] || [];
this.middlewares[command] = middlewares.concat(this.middlewares[command]);
}
}
/**
* Runs a command with the given parameters
*
* @param id - The command ID
* @param commandName - The name of the command to run
* @param payload - The payload for the command
* @param connection - The connection that initiated the command
* @param server - The server instance
*/
async runCommand(
id: number,
commandName: string,
payload: any,
connection: Connection,
server: any
) {
const context = new MeshContext(server, commandName, connection, payload);
try {
if (!this.commands[commandName]) {
throw new CodeError(
`Command "${commandName}" not found`,
"ENOTFOUND",
"CommandError"
);
}
if (this.globalMiddlewares.length) {
for (const middleware of this.globalMiddlewares) {
await middleware(context);
}
}
if (this.middlewares[commandName]) {
for (const middleware of this.middlewares[commandName]) {
await middleware(context);
}
}
const result = await this.commands[commandName](context);
connection.send({ id, command: commandName, payload: result });
} catch (err) {
const errorPayload =
err instanceof Error
? {
error: err.message,
code: (err as CodeError).code || "ESERVER",
name: err.name || "Error",
}
: { error: String(err), code: "EUNKNOWN", name: "UnknownError" };
connection.send({ id, command: commandName, payload: errorPayload });
}
}
/**
* Gets all registered commands
*
* @returns An object mapping command names to their handler functions
*/
getCommands(): {
[command: string]: (context: MeshContext<any>) => Promise<any> | any;
} {
return this.commands;
}
/**
* Checks if a command is registered
*
* @param commandName - The name of the command to check
* @returns true if the command is registered, false otherwise
*/
hasCommand(commandName: string): boolean {
return !!this.commands[commandName];
}
}

View File

@ -1,6 +1,6 @@
import type Redis from "ioredis";
import type { Connection } from "./connection";
import type { RoomManager } from "./room-manager";
import type { Connection } from "../connection";
import type { RoomManager } from "./room";
const CONNECTIONS_HASH_KEY = "mesh:connections";
const INSTANCE_CONNECTIONS_KEY_PREFIX = "mesh:connections:";

View File

@ -1,6 +1,6 @@
import type { Redis } from "ioredis";
import type { Connection } from "./connection";
import type { RoomManager } from "./room-manager";
import type { Connection } from "../connection";
import type { RoomManager } from "./room";
type ChannelPattern = string | RegExp;

View File

@ -0,0 +1,223 @@
import type { Redis } from "ioredis";
import type { Connection } from "../connection";
import type { ConnectionManager } from "./connection";
import type { PubSubMessagePayload, RecordUpdatePubSubPayload } from "../types";
import {
PUB_SUB_CHANNEL_PREFIX,
RECORD_PUB_SUB_CHANNEL,
} from "../utils/constants";
export class PubSubManager {
private subClient: Redis;
private instanceId: string;
private connectionManager: ConnectionManager;
private recordSubscriptions: Map<
string, // recordId
Map<string, "patch" | "full"> // connectionId -> mode
>;
private getChannelSubscriptions: (
channel: string
) => Set<Connection> | undefined;
private emitError: (error: Error) => void;
private _subscriptionPromise!: Promise<void>;
constructor(
subClient: Redis,
instanceId: string,
connectionManager: ConnectionManager,
recordSubscriptions: Map<string, Map<string, "patch" | "full">>,
getChannelSubscriptions: (channel: string) => Set<Connection> | undefined,
emitError: (error: Error) => void
) {
this.subClient = subClient;
this.instanceId = instanceId;
this.connectionManager = connectionManager;
this.recordSubscriptions = recordSubscriptions;
this.getChannelSubscriptions = getChannelSubscriptions;
this.emitError = emitError;
}
/**
* Subscribes to the instance channel and sets up message handlers
*
* @returns A promise that resolves when the subscription is complete
*/
subscribeToInstanceChannel(): Promise<void> {
const channel = `${PUB_SUB_CHANNEL_PREFIX}${this.instanceId}`;
this._subscriptionPromise = new Promise((resolve, reject) => {
this.subClient.subscribe(channel, RECORD_PUB_SUB_CHANNEL);
this.subClient.psubscribe("mesh:presence:updates:*", (err) => {
if (err) {
this.emitError(
new Error(
`Failed to subscribe to channels ${channel}, ${RECORD_PUB_SUB_CHANNEL}:`,
{ cause: err }
)
);
reject(err);
return;
}
resolve();
});
});
this.setupMessageHandlers();
return this._subscriptionPromise;
}
/**
* Sets up message handlers for the subscribed channels
*/
private setupMessageHandlers(): void {
this.subClient.on("message", async (channel, message) => {
if (channel.startsWith(PUB_SUB_CHANNEL_PREFIX)) {
this.handleInstancePubSubMessage(channel, message);
} else if (channel === RECORD_PUB_SUB_CHANNEL) {
this.handleRecordUpdatePubSubMessage(message);
} else {
const subscribers = this.getChannelSubscriptions(channel);
if (subscribers) {
for (const connection of subscribers) {
if (!connection.isDead) {
connection.send({
command: "mesh/subscription-message",
payload: { channel, message },
});
}
}
}
}
});
this.subClient.on("pmessage", async (pattern, channel, message) => {
if (pattern === "mesh:presence:updates:*") {
// channel here is the actual channel, e.g., mesh:presence:updates:roomName
const subscribers = this.getChannelSubscriptions(channel);
if (subscribers) {
try {
const payload = JSON.parse(message);
subscribers.forEach((connection: Connection) => {
if (!connection.isDead) {
connection.send({
command: "mesh/presence-update",
payload: payload,
});
} else {
// clean up dead connections from subscription list
subscribers.delete(connection);
}
});
} catch (e) {
this.emitError(
new Error(`Failed to parse presence update: ${message}`)
);
}
}
}
});
}
/**
* Handles messages from the instance PubSub channel
*
* @param channel - The channel the message was received on
* @param message - The message content
*/
private handleInstancePubSubMessage(channel: string, message: string) {
try {
const parsedMessage = JSON.parse(message) as PubSubMessagePayload;
if (
!parsedMessage ||
!Array.isArray(parsedMessage.targetConnectionIds) ||
!parsedMessage.command ||
typeof parsedMessage.command.command !== "string"
) {
throw new Error("Invalid message format");
}
const { targetConnectionIds, command } = parsedMessage;
targetConnectionIds.forEach((connectionId) => {
const connection =
this.connectionManager.getLocalConnection(connectionId);
if (connection && !connection.isDead) {
connection.send(command);
}
});
} catch (err) {
this.emitError(new Error(`Failed to parse message: ${message}`));
}
}
/**
* Handles record update messages from the record PubSub channel
*
* @param message - The message content
*/
private handleRecordUpdatePubSubMessage(message: string) {
try {
const parsedMessage = JSON.parse(message) as RecordUpdatePubSubPayload;
const { recordId, newValue, patch, version } = parsedMessage;
if (!recordId || typeof version !== "number") {
throw new Error("Invalid record update message format");
}
const subscribers = this.recordSubscriptions.get(recordId);
if (!subscribers) {
return;
}
subscribers.forEach((mode, connectionId) => {
const connection =
this.connectionManager.getLocalConnection(connectionId);
if (connection && !connection.isDead) {
if (mode === "patch" && patch) {
connection.send({
command: "mesh/record-update",
payload: { recordId, patch, version },
});
} else if (mode === "full" && newValue !== undefined) {
connection.send({
command: "mesh/record-update",
payload: { recordId, full: newValue, version },
});
}
} else if (!connection) {
subscribers.delete(connectionId);
if (subscribers.size === 0) {
this.recordSubscriptions.delete(recordId);
}
}
});
} catch (err) {
this.emitError(
new Error(`Failed to parse record update message: ${message}`)
);
}
}
/**
* Gets the subscription promise
*
* @returns The subscription promise
*/
getSubscriptionPromise(): Promise<void> {
return this._subscriptionPromise;
}
/**
* Gets the PubSub channel for an instance
*
* @param instanceId - The instance ID
* @returns The PubSub channel name
*/
getPubSubChannel(instanceId: string): string {
return `${PUB_SUB_CHANNEL_PREFIX}${instanceId}`;
}
}

View File

@ -0,0 +1,270 @@
import type { Redis } from "ioredis";
import type { Connection } from "../connection";
import type { ChannelPattern } from "../types";
import type { RecordManager } from "./record";
import type { Operation } from "fast-json-patch";
import { RECORD_PUB_SUB_CHANNEL } from "../utils/constants";
export class RecordSubscriptionManager {
private pubClient: Redis;
private recordManager: RecordManager;
private exposedRecords: ChannelPattern[] = [];
private exposedWritableRecords: ChannelPattern[] = [];
private recordGuards: Map<
ChannelPattern,
(connection: Connection, recordId: string) => Promise<boolean> | boolean
> = new Map();
private writableRecordGuards: Map<
ChannelPattern,
(connection: Connection, recordId: string) => Promise<boolean> | boolean
> = new Map();
private recordSubscriptions: Map<
string, // recordId
Map<string, "patch" | "full"> // connectionId -> mode
> = new Map();
private emitError: (error: Error) => void;
constructor(
pubClient: Redis,
recordManager: RecordManager,
emitError: (error: Error) => void
) {
this.pubClient = pubClient;
this.recordManager = recordManager;
this.emitError = emitError;
}
/**
* Exposes a record or pattern for client subscriptions, optionally adding a guard function.
*
* @param {ChannelPattern} recordPattern - The record ID or pattern to expose.
* @param {(connection: Connection, recordId: string) => Promise<boolean> | boolean} [guard] - Optional guard function.
*/
exposeRecord(
recordPattern: ChannelPattern,
guard?: (
connection: Connection,
recordId: string
) => Promise<boolean> | boolean
): void {
this.exposedRecords.push(recordPattern);
if (guard) {
this.recordGuards.set(recordPattern, guard);
}
}
/**
* Exposes a record or pattern for client writes, optionally adding a guard function.
*
* @param {ChannelPattern} recordPattern - The record ID or pattern to expose as writable.
* @param {(connection: Connection, recordId: string) => Promise<boolean> | boolean} [guard] - Optional guard function.
*/
exposeWritableRecord(
recordPattern: ChannelPattern,
guard?: (
connection: Connection,
recordId: string
) => Promise<boolean> | boolean
): void {
this.exposedWritableRecords.push(recordPattern);
if (guard) {
this.writableRecordGuards.set(recordPattern, guard);
}
}
/**
* Checks if a record is exposed for reading
*
* @param recordId - The record ID to check
* @param connection - The connection requesting access
* @returns A promise that resolves to true if the record is exposed and the connection has access
*/
async isRecordExposed(
recordId: string,
connection: Connection
): Promise<boolean> {
const readPattern = this.exposedRecords.find((pattern) =>
typeof pattern === "string"
? pattern === recordId
: pattern.test(recordId)
);
let canRead = false;
if (readPattern) {
const guard = this.recordGuards.get(readPattern);
if (guard) {
try {
canRead = await Promise.resolve(guard(connection, recordId));
} catch (e) {
canRead = false;
}
} else {
canRead = true;
}
}
if (canRead) {
return true;
}
// if exposed as writable, it is implicitly readable
const writePattern = this.exposedWritableRecords.find((pattern) =>
typeof pattern === "string"
? pattern === recordId
: pattern.test(recordId)
);
// If exposed as writable, it's readable. No need to check the *write* guard here.
if (writePattern) {
return true;
}
return false;
}
/**
* Checks if a record is exposed for writing
*
* @param recordId - The record ID to check
* @param connection - The connection requesting access
* @returns A promise that resolves to true if the record is writable and the connection has access
*/
async isRecordWritable(
recordId: string,
connection: Connection
): Promise<boolean> {
const matchedPattern = this.exposedWritableRecords.find((pattern) =>
typeof pattern === "string"
? pattern === recordId
: pattern.test(recordId)
);
if (!matchedPattern) {
return false;
}
const guard = this.writableRecordGuards.get(matchedPattern);
if (guard) {
try {
return await Promise.resolve(guard(connection, recordId));
} catch (e) {
return false;
}
}
return true;
}
/**
* Subscribes a connection to a record
*
* @param recordId - The record ID to subscribe to
* @param connectionId - The connection ID to subscribe
* @param mode - The subscription mode (patch or full)
*/
addSubscription(
recordId: string,
connectionId: string,
mode: "patch" | "full"
): void {
if (!this.recordSubscriptions.has(recordId)) {
this.recordSubscriptions.set(recordId, new Map());
}
this.recordSubscriptions.get(recordId)!.set(connectionId, mode);
}
/**
* Unsubscribes a connection from a record
*
* @param recordId - The record ID to unsubscribe from
* @param connectionId - The connection ID to unsubscribe
* @returns true if the connection was subscribed and is now unsubscribed, false otherwise
*/
removeSubscription(recordId: string, connectionId: string): boolean {
const recordSubs = this.recordSubscriptions.get(recordId);
if (recordSubs?.has(connectionId)) {
recordSubs.delete(connectionId);
if (recordSubs.size === 0) {
this.recordSubscriptions.delete(recordId);
}
return true;
}
return false;
}
/**
* Gets all subscribers for a record
*
* @param recordId - The record ID to get subscribers for
* @returns A map of connection IDs to subscription modes, or undefined if none
*/
getSubscribers(recordId: string): Map<string, "patch" | "full"> | undefined {
return this.recordSubscriptions.get(recordId);
}
/**
* Updates a record, persists it to Redis, increments its version, computes a patch,
* and publishes the update via Redis pub/sub.
*
* @param {string} recordId - The ID of the record to update.
* @param {any} newValue - The new value for the record.
* @returns {Promise<void>}
* @throws {Error} If the update fails.
*/
async publishRecordUpdate(recordId: string, newValue: any): Promise<void> {
const updateResult = await this.recordManager.publishUpdate(
recordId,
newValue
);
if (!updateResult) {
return;
}
const { patch, version } = updateResult;
const messagePayload = {
recordId,
newValue,
patch,
version,
};
try {
await this.pubClient.publish(
RECORD_PUB_SUB_CHANNEL,
JSON.stringify(messagePayload)
);
} catch (err) {
this.emitError(
new Error(`Failed to publish record update for "${recordId}": ${err}`)
);
}
}
/**
* Cleans up all subscriptions for a connection
*
* @param connection - The connection to clean up
*/
cleanupConnection(connection: Connection): void {
const connectionId = connection.id;
this.recordSubscriptions.forEach((subscribers, recordId) => {
if (subscribers.has(connectionId)) {
subscribers.delete(connectionId);
if (subscribers.size === 0) {
this.recordSubscriptions.delete(recordId);
}
}
});
}
/**
* Gets all record subscriptions
*
* @returns The record subscriptions map
*/
getRecordSubscriptions(): Map<string, Map<string, "patch" | "full">> {
return this.recordSubscriptions;
}
}

View File

@ -0,0 +1,120 @@
import { Redis, type RedisOptions } from "ioredis";
export class RedisManager {
private _redis: Redis | null = null;
private _pubClient: Redis | null = null;
private _subClient: Redis | null = null;
private _isShuttingDown = false;
private _options: RedisOptions | null = null;
/**
* Initializes Redis connections with the provided options
*
* @param options - Redis connection options
* @param onError - Error handler callback
*/
initialize(options: RedisOptions, onError: (err: Error) => void): void {
this._options = options;
this._redis = new Redis({
retryStrategy: (times: number) => {
if (this._isShuttingDown) {
return null;
}
if (times > 10) {
return null;
}
return Math.min(1000 * Math.pow(2, times), 30000);
},
...options,
});
this._redis.on("error", (err) => {
onError(new Error(`Redis error: ${err}`));
});
this._pubClient = this._redis.duplicate();
this._subClient = this._redis.duplicate();
}
/**
* Gets the main Redis client
*
* @returns The Redis client
* @throws Error if Redis is not initialized
*/
get redis(): Redis {
if (!this._redis) {
throw new Error("Redis not initialized");
}
return this._redis;
}
/**
* Gets the Redis client for publishing
*
* @returns The publishing Redis client
* @throws Error if Redis is not initialized
*/
get pubClient(): Redis {
if (!this._pubClient) {
throw new Error("Redis pub client not initialized");
}
return this._pubClient;
}
/**
* Gets the Redis client for subscribing
*
* @returns The subscribing Redis client
* @throws Error if Redis is not initialized
*/
get subClient(): Redis {
if (!this._subClient) {
throw new Error("Redis sub client not initialized");
}
return this._subClient;
}
/**
* Disconnects all Redis clients
*/
disconnect(): void {
this._isShuttingDown = true;
if (this._pubClient) {
this._pubClient.disconnect();
this._pubClient = null;
}
if (this._subClient) {
this._subClient.disconnect();
this._subClient = null;
}
if (this._redis) {
this._redis.disconnect();
this._redis = null;
}
}
/**
* Checks if Redis is shutting down
*
* @returns true if Redis is shutting down, false otherwise
*/
get isShuttingDown(): boolean {
return this._isShuttingDown;
}
/**
* Sets the shutting down state
*
* @param value - The new shutting down state
*/
set isShuttingDown(value: boolean) {
this._isShuttingDown = value;
}
}

View File

@ -1,5 +1,5 @@
import Redis from "ioredis";
import type { Connection } from "./connection";
import type { Connection } from "../connection";
export class RoomManager {
private redis: Redis;

View File

@ -0,0 +1,21 @@
import type { Connection } from "./connection";
import type { MeshServer } from "./mesh-server";
export class MeshContext<T = any> {
server: MeshServer;
command: string;
connection: Connection;
payload: T;
constructor(
server: MeshServer,
command: string,
connection: Connection,
payload: T
) {
this.server = server;
this.command = command;
this.connection = connection;
this.payload = payload;
}
}

View File

@ -0,0 +1,770 @@
import { IncomingMessage } from "node:http";
import { v4 as uuidv4 } from "uuid";
import { WebSocketServer } from "ws";
import { Status } from "../client";
import { parseCommand } from "../common/message";
import { Connection } from "./connection";
import { MeshContext } from "./mesh-context";
import { ConnectionManager } from "./managers/connection";
import { PresenceManager } from "./managers/presence";
import { RecordManager } from "./managers/record";
import { RoomManager } from "./managers/room";
import { BroadcastManager } from "./managers/broadcast";
import { ChannelManager } from "./managers/channel";
import { CommandManager } from "./managers/command";
import { PubSubManager } from "./managers/pubsub";
import { RecordSubscriptionManager } from "./managers/record-subscription";
import { RedisManager } from "./managers/redis";
import type {
ChannelPattern,
MeshServerOptions,
SocketMiddleware,
} from "./types";
import { PUB_SUB_CHANNEL_PREFIX } from "./utils/constants";
export class MeshServer extends WebSocketServer {
readonly instanceId: string;
private redisManager: RedisManager;
private commandManager: CommandManager;
private channelManager: ChannelManager;
private pubSubManager: PubSubManager;
private recordSubscriptionManager: RecordSubscriptionManager;
private broadcastManager: BroadcastManager;
roomManager: RoomManager;
recordManager: RecordManager;
connectionManager: ConnectionManager;
presenceManager: PresenceManager;
serverOptions: MeshServerOptions;
status: Status = Status.OFFLINE;
private _listening = false;
get listening(): boolean {
return this._listening;
}
set listening(value: boolean) {
this._listening = value;
this.status = value ? Status.ONLINE : Status.OFFLINE;
}
constructor(opts: MeshServerOptions) {
super(opts);
this.instanceId = uuidv4();
this.serverOptions = {
...opts,
pingInterval: opts.pingInterval ?? 30_000,
latencyInterval: opts.latencyInterval ?? 5_000,
maxMissedPongs: opts.maxMissedPongs ?? 1,
};
this.redisManager = new RedisManager();
this.redisManager.initialize(opts.redisOptions, (err) =>
this.emit("error", err)
);
this.roomManager = new RoomManager(this.redisManager.redis);
this.recordManager = new RecordManager(this.redisManager.redis);
this.connectionManager = new ConnectionManager(
this.redisManager.pubClient,
this.instanceId,
this.roomManager
);
this.presenceManager = new PresenceManager(
this.redisManager.redis,
this.roomManager
);
this.commandManager = new CommandManager((err) => this.emit("error", err));
this.channelManager = new ChannelManager(
this.redisManager.redis,
this.redisManager.pubClient,
this.redisManager.subClient,
(err) => this.emit("error", err)
);
this.recordSubscriptionManager = new RecordSubscriptionManager(
this.redisManager.pubClient,
this.recordManager,
(err) => this.emit("error", err)
);
this.pubSubManager = new PubSubManager(
this.redisManager.subClient,
this.instanceId,
this.connectionManager,
this.recordSubscriptionManager.getRecordSubscriptions(),
this.channelManager.getSubscribers.bind(this.channelManager),
(err) => this.emit("error", err)
);
this.broadcastManager = new BroadcastManager(
this.connectionManager,
this.roomManager,
this.instanceId,
this.redisManager.pubClient,
(instanceId) => `${PUB_SUB_CHANNEL_PREFIX}${instanceId}`,
(err) => this.emit("error", err)
);
this.on("listening", () => {
this.listening = true;
});
this.on("error", (err) => {
console.error(`[MeshServer] Error: ${err}`);
});
this.on("close", () => {
this.listening = false;
});
this.pubSubManager.subscribeToInstanceChannel();
this.registerBuiltinCommands();
this.registerRecordCommands();
this.applyListeners();
}
/**
* Waits until the service is ready by ensuring it is listening and the instance channel subscription is established.
*
* @returns {Promise<void>} A promise that resolves when the service is fully ready.
* @throws {Error} If the readiness process fails or if any awaited promise rejects.
*/
async ready(): Promise<void> {
const listeningPromise = this.listening
? Promise.resolve()
: new Promise<void>((resolve) => this.once("listening", resolve));
await Promise.all([
listeningPromise,
this.pubSubManager.getSubscriptionPromise(),
]);
}
private applyListeners() {
this.on("connection", async (socket, req: IncomingMessage) => {
const connection = new Connection(socket, req, this.serverOptions);
connection.on("message", (buffer: Buffer) => {
try {
const data = buffer.toString();
const command = parseCommand(data);
if (
command.id !== undefined &&
!["latency:response", "pong"].includes(command.command)
) {
this.commandManager.runCommand(
command.id,
command.command,
command.payload,
connection,
this
);
}
} catch (err) {
this.emit("error", err);
}
});
try {
await this.connectionManager.registerConnection(connection);
} catch (error) {
connection.close();
return;
}
this.emit("connected", connection);
connection.on("close", async () => {
await this.cleanupConnection(connection);
this.emit("disconnected", connection);
});
connection.on("error", (err) => {
this.emit("clientError", err, connection);
});
connection.on("pong", async (connectionId) => {
try {
const rooms = await this.roomManager.getRoomsForConnection(
connectionId
);
for (const roomName of rooms) {
if (await this.presenceManager.isRoomTracked(roomName)) {
await this.presenceManager.refreshPresence(
connectionId,
roomName
);
}
}
} catch (err) {
this.emit("error", new Error(`Failed to refresh presence: ${err}`));
}
});
});
}
// #region Command Management
/**
* Registers a command with an associated callback and optional middleware.
*
* @template T The type for `MeshContext.payload`. Defaults to `any`.
* @template U The command's return value type. Defaults to `any`.
* @param {string} command - The unique identifier for the command to register.
* @param {(context: MeshContext<T>) => Promise<U> | U} callback - The function to execute when the command is invoked. It receives a `MeshContext` of type `T` and may return a value of type `U` or a `Promise` resolving to `U`.
* @param {SocketMiddleware[]} [middlewares=[]] - An optional array of middleware functions to apply to the command. Defaults to an empty array.
* @throws {Error} May throw an error if the command registration or middleware addition fails.
*/
registerCommand<T = any, U = any>(
command: string,
callback: (context: MeshContext<T>) => Promise<U> | U,
middlewares: SocketMiddleware[] = []
) {
this.commandManager.registerCommand(command, callback, middlewares);
}
/**
* Adds one or more middleware functions to the global middleware stack.
*
* @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added. Each middleware
* is expected to conform to the `SocketMiddleware` type.
* @returns {void}
* @throws {Error} If the provided middlewares are not valid or fail validation (if applicable).
*/
addMiddleware(...middlewares: SocketMiddleware[]): void {
this.commandManager.addMiddleware(...middlewares);
}
/**
* Adds an array of middleware functions to a specific command.
*
* @param {string} command - The name of the command to associate the middleware with.
* @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added to the command.
* @returns {void}
*/
addMiddlewareToCommand(
command: string,
middlewares: SocketMiddleware[]
): void {
this.commandManager.addMiddlewareToCommand(command, middlewares);
}
// #endregion
// #region Channel Management
/**
* Exposes a channel for external access and optionally associates a guard function
* to control access to that channel. The guard function determines whether a given
* connection is permitted to access the channel.
*
* @param {ChannelPattern} channel - The channel or pattern to expose.
* @param {(connection: Connection, channel: string) => Promise<boolean> | boolean} [guard] -
* Optional guard function that receives the connection and channel name, returning
* a boolean or a promise that resolves to a boolean indicating whether access is allowed.
* @returns {void}
*/
exposeChannel(
channel: ChannelPattern,
guard?: (
connection: Connection,
channel: string
) => Promise<boolean> | boolean
): void {
this.channelManager.exposeChannel(channel, guard);
}
/**
* Publishes a message to a specified channel and optionally maintains a history of messages.
*
* @param {string} channel - The name of the channel to which the message will be published.
* @param {any} message - The message to be published. Will not be stringified automatically for you. You need to do that yourself.
* @param {number} [history=0] - The number of historical messages to retain for the channel. Defaults to 0, meaning no history is retained.
* If greater than 0, the message will be added to the channel's history and the history will be trimmed to the specified size.
* @returns {Promise<void>} A Promise that resolves once the message has been published and, if applicable, the history has been updated.
* @throws {Error} This function may throw an error if the underlying `pubClient` operations (e.g., `lpush`, `ltrim`, `publish`) fail.
*/
async publishToChannel(
channel: string,
message: any,
history: number = 0
): Promise<void> {
return this.channelManager.publishToChannel(channel, message, history);
}
// #endregion
// #region Record Management
/**
* Exposes a record or pattern for client subscriptions, optionally adding a guard function.
*
* @param {ChannelPattern} recordPattern - The record ID or pattern to expose.
* @param {(connection: Connection, recordId: string) => Promise<boolean> | boolean} [guard] - Optional guard function.
*/
exposeRecord(
recordPattern: ChannelPattern,
guard?: (
connection: Connection,
recordId: string
) => Promise<boolean> | boolean
): void {
this.recordSubscriptionManager.exposeRecord(recordPattern, guard);
}
/**
* Exposes a record or pattern for client writes, optionally adding a guard function.
*
* @param {ChannelPattern} recordPattern - The record ID or pattern to expose as writable.
* @param {(connection: Connection, recordId: string) => Promise<boolean> | boolean} [guard] - Optional guard function.
*/
exposeWritableRecord(
recordPattern: ChannelPattern,
guard?: (
connection: Connection,
recordId: string
) => Promise<boolean> | boolean
): void {
this.recordSubscriptionManager.exposeWritableRecord(recordPattern, guard);
}
/**
* Updates a record, persists it to Redis, increments its version, computes a patch,
* and publishes the update via Redis pub/sub.
*
* @param {string} recordId - The ID of the record to update.
* @param {any} newValue - The new value for the record.
* @returns {Promise<void>}
* @throws {Error} If the update fails.
*/
async publishRecordUpdate(recordId: string, newValue: any): Promise<void> {
return this.recordSubscriptionManager.publishRecordUpdate(
recordId,
newValue
);
}
// #endregion
// #region Room Management
async isInRoom(roomName: string, connection: Connection | string) {
const connectionId =
typeof connection === "string" ? connection : connection.id;
return this.roomManager.connectionIsInRoom(roomName, connectionId);
}
async addToRoom(roomName: string, connection: Connection | string) {
const connectionId =
typeof connection === "string" ? connection : connection.id;
await this.roomManager.addToRoom(roomName, connection);
if (await this.presenceManager.isRoomTracked(roomName)) {
await this.presenceManager.markOnline(connectionId, roomName);
}
}
async removeFromRoom(roomName: string, connection: Connection | string) {
const connectionId =
typeof connection === "string" ? connection : connection.id;
if (await this.presenceManager.isRoomTracked(roomName)) {
await this.presenceManager.markOffline(connectionId, roomName);
}
return this.roomManager.removeFromRoom(roomName, connection);
}
async removeFromAllRooms(connection: Connection | string) {
return this.roomManager.removeFromAllRooms(connection);
}
async clearRoom(roomName: string) {
return this.roomManager.clearRoom(roomName);
}
async getRoomMembers(roomName: string): Promise<string[]> {
return this.roomManager.getRoomConnectionIds(roomName);
}
// #endregion
// #region Broadcasting
/**
* Broadcasts a command and payload to a set of connections or all available connections.
*
* @param {string} command - The command to be broadcasted.
* @param {any} payload - The data associated with the command.
* @param {Connection[]=} connections - (Optional) A specific list of connections to broadcast to. If not provided, the command will be sent to all connections.
*
* @throws {Error} Emits an "error" event if broadcasting fails.
*/
async broadcast(command: string, payload: any, connections?: Connection[]) {
return this.broadcastManager.broadcast(command, payload, connections);
}
/**
* Broadcasts a command and associated payload to all active connections within the specified room.
*
* @param {string} roomName - The name of the room whose connections will receive the broadcast.
* @param {string} command - The command to be broadcasted to the connections.
* @param {unknown} payload - The data payload associated with the command.
* @returns {Promise<void>} A promise that resolves when the broadcast operation is complete.
* @throws {Error} If the broadcast operation fails, an error is thrown and the promise is rejected.
*/
async broadcastRoom(
roomName: string,
command: string,
payload: any
): Promise<void> {
return this.broadcastManager.broadcastRoom(roomName, command, payload);
}
/**
* Broadcasts a command and payload to all active connections except for the specified one(s).
* Excludes the provided connection(s) from receiving the broadcast.
*
* @param {string} command - The command to broadcast to connections.
* @param {any} payload - The payload to send along with the command.
* @param {Connection | Connection[]} exclude - A single connection or an array of connections to exclude from the broadcast.
* @returns {Promise<void>} A promise that resolves when the broadcast is complete.
* @emits {Error} Emits an "error" event if broadcasting the command fails.
*/
async broadcastExclude(
command: string,
payload: any,
exclude: Connection | Connection[]
): Promise<void> {
return this.broadcastManager.broadcastExclude(command, payload, exclude);
}
/**
* Broadcasts a command with a payload to all connections in a specified room,
* excluding one or more given connections. If the broadcast fails, emits an error event.
*
* @param {string} roomName - The name of the room to broadcast to.
* @param {string} command - The command to broadcast.
* @param {any} payload - The payload to send with the command.
* @param {Connection | Connection[]} exclude - A connection or array of connections to exclude from the broadcast.
* @returns {Promise<void>} A promise that resolves when the broadcast is complete.
* @emits {Error} Emits an error event if broadcasting fails.
*/
async broadcastRoomExclude(
roomName: string,
command: string,
payload: any,
exclude: Connection | Connection[]
): Promise<void> {
return this.broadcastManager.broadcastRoomExclude(
roomName,
command,
payload,
exclude
);
}
// #endregion
// #region Presence Management
trackPresence(
roomPattern: string | RegExp,
guardOrOptions?:
| ((
connection: Connection,
roomName: string
) => Promise<boolean> | boolean)
| {
ttl?: number;
guard?: (
connection: Connection,
roomName: string
) => Promise<boolean> | boolean;
}
): void {
this.presenceManager.trackRoom(roomPattern, guardOrOptions);
}
// #endregion
// #region Command Registration
private registerBuiltinCommands() {
this.registerCommand<
{ channel: string; historyLimit?: number },
{ success: boolean; history?: string[] }
>("mesh/subscribe-channel", async (ctx) => {
const { channel, historyLimit } = ctx.payload;
if (
!(await this.channelManager.isChannelExposed(channel, ctx.connection))
) {
return { success: false, history: [] };
}
try {
if (!this.channelManager.getSubscribers(channel)) {
await this.channelManager.subscribeToRedisChannel(channel);
}
this.channelManager.addSubscription(channel, ctx.connection);
let history: string[] = [];
if (historyLimit && historyLimit > 0) {
history = await this.channelManager.getChannelHistory(
channel,
historyLimit
);
}
return {
success: true,
history,
};
} catch (e) {
return { success: false, history: [] };
}
});
this.registerCommand<{ channel: string }, boolean>(
"mesh/unsubscribe-channel",
async (ctx) => {
const { channel } = ctx.payload;
const wasSubscribed = this.channelManager.removeSubscription(
channel,
ctx.connection
);
if (wasSubscribed && !this.channelManager.getSubscribers(channel)) {
await this.channelManager.unsubscribeFromRedisChannel(channel);
}
return wasSubscribed;
}
);
this.registerCommand<
{ roomName: string },
{ success: boolean; present: string[] }
>("mesh/join-room", async (ctx) => {
const { roomName } = ctx.payload;
await this.addToRoom(roomName, ctx.connection);
const present = await this.presenceManager.getPresentConnections(
roomName
);
return { success: true, present };
});
this.registerCommand<{ roomName: string }, { success: boolean }>(
"mesh/leave-room",
async (ctx) => {
const { roomName } = ctx.payload;
await this.removeFromRoom(roomName, ctx.connection);
return { success: true };
}
);
}
private registerRecordCommands() {
this.registerCommand<
{ recordId: string; mode?: "patch" | "full" },
{ success: boolean; record?: any; version?: number }
>("mesh/subscribe-record", async (ctx) => {
const { recordId, mode = "full" } = ctx.payload;
const connectionId = ctx.connection.id;
if (
!(await this.recordSubscriptionManager.isRecordExposed(
recordId,
ctx.connection
))
) {
return { success: false };
}
try {
const { record, version } =
await this.recordManager.getRecordAndVersion(recordId);
this.recordSubscriptionManager.addSubscription(
recordId,
connectionId,
mode
);
return { success: true, record, version };
} catch (e) {
console.error(`Failed to subscribe to record ${recordId}:`, e);
return { success: false };
}
});
this.registerCommand<{ recordId: string }, boolean>(
"mesh/unsubscribe-record",
async (ctx) => {
const { recordId } = ctx.payload;
const connectionId = ctx.connection.id;
return this.recordSubscriptionManager.removeSubscription(
recordId,
connectionId
);
}
);
this.registerCommand<
{ recordId: string; newValue: any },
{ success: boolean }
>("mesh/publish-record-update", async (ctx) => {
const { recordId, newValue } = ctx.payload;
if (
!(await this.recordSubscriptionManager.isRecordWritable(
recordId,
ctx.connection
))
) {
throw new Error(
`Record "${recordId}" is not writable by this connection.`
);
}
try {
await this.publishRecordUpdate(recordId, newValue);
return { success: true };
} catch (e: any) {
throw new Error(
`Failed to publish update for record "${recordId}": ${e.message}`
);
}
});
this.registerCommand<
{ roomName: string },
{ success: boolean; present: string[] }
>("mesh/subscribe-presence", async (ctx) => {
const { roomName } = ctx.payload;
if (
!(await this.presenceManager.isRoomTracked(roomName, ctx.connection))
) {
return { success: false, present: [] };
}
try {
const presenceChannel = `mesh:presence:updates:${roomName}`;
if (!this.channelManager.getSubscribers(presenceChannel)) {
this.channelManager.addSubscription(presenceChannel, ctx.connection);
}
const present = await this.presenceManager.getPresentConnections(
roomName
);
return { success: true, present };
} catch (e) {
console.error(
`Failed to subscribe to presence for room ${roomName}:`,
e
);
return { success: false, present: [] };
}
});
this.registerCommand<{ roomName: string }, boolean>(
"mesh/unsubscribe-presence",
async (ctx) => {
const { roomName } = ctx.payload;
const presenceChannel = `mesh:presence:updates:${roomName}`;
return this.channelManager.removeSubscription(
presenceChannel,
ctx.connection
);
}
);
}
// #endregion
private async cleanupConnection(connection: Connection) {
connection.stopIntervals();
try {
await this.presenceManager.cleanupConnection(connection);
await this.connectionManager.cleanupConnection(connection);
await this.roomManager.cleanupConnection(connection);
this.recordSubscriptionManager.cleanupConnection(connection);
this.channelManager.cleanupConnection(connection);
} catch (err) {
this.emit("error", new Error(`Failed to clean up connection: ${err}`));
}
}
/**
* Gracefully closes all active connections, cleans up resources,
* and shuts down the service. Optionally accepts a callback function
* that will be invoked once shutdown is complete or if an error occurs.
*
* @param {((err?: Error) => void)=} callback - Optional callback to be invoked when closing is complete or if an error occurs.
* @returns {Promise<void>} A promise that resolves when shutdown is complete.
* @throws {Error} If an error occurs during shutdown, the promise will be rejected with the error.
*/
async close(callback?: (err?: Error) => void): Promise<void> {
this.redisManager.isShuttingDown = true;
const connections = Object.values(
this.connectionManager.getLocalConnections()
);
await Promise.all(
connections.map(async (connection) => {
if (!connection.isDead) {
await connection.close();
}
await this.cleanupConnection(connection);
})
);
await new Promise<void>((resolve, reject) => {
super.close((err?: Error) => {
if (err) reject(err);
else resolve();
});
});
this.redisManager.disconnect();
this.listening = false;
this.removeAllListeners();
if (callback) {
callback();
}
}
/**
* Registers a callback function to be executed when a new connection is established.
*
* @param {(connection: Connection) => Promise<void> | void} callback - The function to execute when a new connection is established.
* @returns {MeshServer} The server instance for method chaining.
*/
onConnection(
callback: (connection: Connection) => Promise<void> | void
): MeshServer {
this.on("connected", callback);
return this;
}
/**
* Registers a callback function to be executed when a connection is closed.
*
* @param {(connection: Connection) => Promise<void> | void} callback - The function to execute when a connection is closed.
* @returns {MeshServer} The server instance for method chaining.
*/
onDisconnection(
callback: (connection: Connection) => Promise<void> | void
): MeshServer {
this.on("disconnected", callback);
return this;
}
}

View File

@ -0,0 +1,55 @@
import type { ServerOptions } from "ws";
import type { RedisOptions } from "ioredis";
import type { Operation } from "fast-json-patch";
import type { Connection } from "./connection";
import type { Command } from "../common/message";
import type { MeshContext } from "./mesh-context";
export type SocketMiddleware = (
context: MeshContext<any>
) => any | Promise<any>;
export type PubSubMessagePayload = {
targetConnectionIds: string[];
command: Command;
};
export type RecordUpdatePubSubPayload = {
recordId: string;
newValue?: any;
patch?: Operation[];
version: number;
};
export type MeshServerOptions = ServerOptions & {
/**
* The interval at which to send ping messages to the client.
*
* @default 30000
*/
pingInterval?: number;
/**
* The interval at which to send both latency requests and updates to the client.
*
* @default 5000
*/
latencyInterval?: number;
redisOptions: RedisOptions;
/**
* The maximum number of consecutive ping intervals the server will wait
* for a pong response before considering the client disconnected.
* A value of 1 means the client must respond within roughly 2 * pingInterval
* before being disconnected. Setting it to 0 is not recommended as it will
* immediately disconnect the client if it doesn't respond to the first ping in
* exactly `pingInterval` milliseconds, which doesn't provide wiggle room for
* network latency.
*
* @see pingInterval
* @default 1
*/
maxMissedPongs?: number;
};
export type ChannelPattern = string | RegExp;

View File

@ -0,0 +1,2 @@
export const PUB_SUB_CHANNEL_PREFIX = "mesh:pubsub:";
export const RECORD_PUB_SUB_CHANNEL = "mesh:record-updates";

View File

@ -48,13 +48,6 @@ describe("MeshServer", () => {
await server.close();
});
test("should create a server instance", () => {
expect(server).toBeInstanceOf(MeshServer);
expect(server.redis).toBeInstanceOf(Redis);
expect(server.roomManager).toBeDefined();
expect(server.connectionManager).toBeDefined();
});
test("clients can connect to the server", async () => {
await clientA.connect();
expect(clientA.status).toBe(Status.ONLINE);