feature: record subscription

This commit is contained in:
nvms 2025-04-17 20:52:01 -04:00
parent 0133f59e39
commit 7c2850db27
9 changed files with 951 additions and 24 deletions

View File

@ -2,6 +2,29 @@
Mesh is a command-based WebSocket server and client framework designed for scalable, multi-instance deployments. It uses Redis to coordinate connections, rooms, and metadata across servers, enabling reliable horizontal scaling. Mesh includes built-in ping/latency tracking, automatic reconnection, and a simple command API for clean, asynchronous, RPC-like communication.
* [Quickstart](#quickstart)
* [Server](#server)
* [Client](#client)
* [Distributed Messaging Architecture](#distributed-messaging-architecture)
* [Redis Channel Subscriptions](#redis-channel-subscriptions)
* [Server Configuration](#server-configuration)
* [Server Publishing](#server-publishing)
* [Client Usage](#client-usage)
* [Metadata](#metadata)
* [Room Metadata](#room-metadata)
* [Record Subscriptions](#record-subscriptions)
* [Server Configuration](#server-configuration-1)
* [Updating Records](#updating-records)
* [Client Usage — Full Mode (default)](#client-usage--full-mode-default)
* [Client Usage — Patch Mode](#client-usage--patch-mode)
* [Unsubscribing](#unsubscribing)
* [Versioning and Resync](#versioning-and-resync)
* [Command Middleware](#command-middleware)
* [Latency Tracking and Connection Liveness](#latency-tracking-and-connection-liveness)
* [Server-Side Configuration](#server-side-configuration)
* [Client-Side Configuration](#client-side-configuration)
* [Comparison](#comparison)
## Quickstart
### Server
@ -79,7 +102,7 @@ client.on("user-joined", (event) => {
await client.close();
```
## Room Communication Flow
## Distributed Messaging Architecture
The diagram below illustrates how Mesh handles communication across multiple server instances. It uses Redis to look up which connections belong to a room, determine their host instances, and routes messages accordingly — either locally or via pub/sub.
@ -261,7 +284,109 @@ const allRoomMeta = await server.roomManager.getAllMetadata();
Room metadata is removed when `clearRoom(roomName)` is called.
### Command Middleware
## Record Subscriptions
Mesh supports subscribing to individual records stored in Redis. When a record changes, clients receive either the full value or a JSON patch describing the update—depending on the selected mode (`full` or `patch`).
Subscriptions are multi-instance aware, versioned for integrity, and efficient at scale. Each connected client can independently choose its preferred mode.
### Server Configuration
Expose records using exact IDs or regex patterns. You can add optional per-client guard logic:
```ts
server.exposeRecord("user:123");
server.exposeRecord(/^product:\d+$/);
server.exposeRecord(/^private:.+$/, async (conn, recordId) => {
const meta = await server.connectionManager.getMetadata(conn);
return !!meta?.userId;
});
```
### Updating Records
Use `publishRecordUpdate()` to update the stored value, increment the version, generate a patch, and broadcast to all subscribed clients.
```ts
await server.publishRecordUpdate("user:123", {
name: "Alice",
email: "alice@example.com",
});
// later...
await server.publishRecordUpdate("user:123", {
name: "Alice",
email: "alice@updated.com",
status: "active",
});
```
### Client Usage — Full Mode (default)
In `full` mode, the client receives the entire updated record every time. This is simpler to use and ideal for small records or when patching isn't needed.
```ts
let userProfile = {};
const { success, record, version } = await client.subscribeRecord(
"user:123",
(update) => {
userProfile = update.full;
console.log(`Received full update v${update.version}:`, update.full);
}
);
if (success) {
userProfile = record;
}
```
### Client Usage — Patch Mode
In `patch` mode, the client receives only changes as JSON patches and must apply them locally. This is especially useful for large records that only change in small ways over time.
```ts
import { applyPatch } from "@prsm/mesh/client";
let productData = {};
const { success, record, version } = await client.subscribeRecord(
"product:456",
(update) => {
if (update.patch) {
// normally youll receive `patch`, but if the client falls out of sync,
// the server will send a full update instead to resynchronize.
applyPatch(productData, update.patch);
console.log(`Applied patch v${update.version}`);
} else {
productData = update.full;
console.log(`Received full (resync) v${update.version}`);
}
},
{ mode: "patch" }
);
if (success) {
productData = record;
}
```
### Unsubscribing
```ts
await client.unsubscribeRecord("user:123");
await client.unsubscribeRecord("product:456");
```
### Versioning and Resync
Every update includes a `version`. Clients should track the current version and, in `patch` mode, expect `version === localVersion + 1`. If a gap is detected (missed patch), the client will automatically be sent a full record update to resync.
This system allows fine-grained, real-time synchronization of distributed state with minimal overhead.
## Command Middleware
Mesh allows you to define middleware functions that run before your command handlers. This is useful for tasks like authentication, validation, logging, or modifying the context before the main command logic executes.
@ -294,10 +419,10 @@ Applied only to the specified command, running _after_ any global middleware.
```ts
const validateProfileUpdate = async (ctx) => {
const { name, email } = ctx.payload;
if (typeof name !== 'string' || name.length === 0) {
if (typeof name !== "string" || name.length === 0) {
throw new Error("Invalid name");
}
if (typeof email !== 'string' || !email.includes('@')) {
if (typeof email !== "string" || !email.includes("@")) {
throw new Error("Invalid email");
}
};
@ -371,6 +496,7 @@ Together, this system provides end-to-end connection liveness guarantees without
| **Automatic Reconnect** | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
| **Redis Pub/Sub** | ✅ Client subscription | ⚠️ Server-side only | ❌ | ✅ | ❌ | ❌ |
| **History on Subscribe** | ✅ Optional Redis-backed | ❌ | ❌ | ⚠️ Streams only | ⚠️ DIY | ❌ |
| **Record Subscriptions** | ✅ Versioned + Patchable | ❌ | ❌ | ⚠️ Raw records | ❌ | ❌ |
| **Typescript-First** | ✅ Yes, mostly | ⚠️ Mixed | ✅ | ⚠️ | ⚠️ | ❌ |
| **Scalability** | ✅ Horizontal via Redis | ✅ Horizontal via Redis Adapter | ✅ | ✅ | ⚠️ Manual | ✅ But no sync |
| **Target Use Case** | Real-time/generic async | Real-time apps, chat | Multiplayer games | Pub/Sub, IoT | Anything (low-level) | Anything (perf-focused) |

View File

@ -5,6 +5,7 @@
"name": "keepalive-multi",
"dependencies": {
"deasync": "^0.1.30",
"fast-json-patch": "^3.1.1",
"ioredis": "^5.6.1",
"uuid": "^11.1.0",
"ws": "^8.18.1",
@ -213,6 +214,8 @@
"expect-type": ["expect-type@1.2.1", "", {}, "sha512-/kP8CAwxzLVEeFrMm4kMmy4CCDlpipyA7MYLVrdJIkV0fYF0UaigQHRsxHiuY/GEea+bh4KSv3TIlgr+2UL6bw=="],
"fast-json-patch": ["fast-json-patch@3.1.1", "", {}, "sha512-vf6IHUX2SBcA+5/+4883dsIjpBTqmfBjmYiWK1savxQmFk4JfBMLa7ynTYOs1Rolp/T1betJxHiGD3g1Mn8lUQ=="],
"fdir": ["fdir@6.4.3", "", { "peerDependencies": { "picomatch": "^3 || ^4" }, "optionalPeers": ["picomatch"] }, "sha512-PMXmW2y1hDDfTSRc9gaXIuCCRpuoz3Kaz8cUelp3smouvfT632ozg2vrT6lJsHKKOF59YLbOGfAWGUcKEfRMQw=="],
"file-uri-to-path": ["file-uri-to-path@1.0.0", "", {}, "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw=="],

View File

@ -33,6 +33,7 @@
},
"dependencies": {
"deasync": "^0.1.30",
"fast-json-patch": "^3.1.1",
"ioredis": "^5.6.1",
"uuid": "^11.1.0",
"ws": "^8.18.1"

View File

@ -4,8 +4,10 @@ import { WebSocket } from "ws";
import { CodeError } from "../common/codeerror";
import { Status } from "../common/status";
import { Connection } from "./connection";
import type { Operation } from "fast-json-patch";
export { Status } from "../common/status";
export { applyPatch } from "fast-json-patch";
export type MeshClientOptions = Partial<{
/**
@ -60,6 +62,18 @@ export class MeshClient extends EventEmitter {
options: Required<MeshClientOptions>;
isReconnecting = false;
private _status: Status = Status.OFFLINE;
private recordSubscriptions: Map<
string, // recordId
{
callback: (update: {
full?: any;
patch?: Operation[];
version: number;
}) => void | Promise<void>;
localVersion: number;
mode: "patch" | "full";
}
> = new Map();
constructor(url: string, opts: MeshClientOptions = {}) {
super();
@ -82,17 +96,31 @@ export class MeshClient extends EventEmitter {
private setupConnectionEvents(): void {
this.connection.on("message", (data) => {
this.emit("message", data);
// data is the parsed command object
this.emit("message", data); // Emit generic message event
const systemCommands = [
"ping",
"pong",
"latency",
"latency:request",
"latency:response",
];
if (data.command && !systemCommands.includes(data.command)) {
// Handle specific commands first
if (data.command === "record-update") {
this.handleRecordUpdate(data.payload);
// Optionally emit the specific event if needed elsewhere
// this.emit(data.command, data.payload);
} else if (data.command === "subscription-message") {
// Let the specific listener in subscribe() handle this
// Emit it here for the existing subscribe logic to work
this.emit(data.command, data.payload);
} else {
// Handle other non-system commands by emitting events
const systemCommands = [
"ping",
"pong",
"latency",
"latency:request",
"latency:response",
// 'subscription-message' and 'record-update' are handled above
];
if (data.command && !systemCommands.includes(data.command)) {
this.emit(data.command, data.payload);
}
}
});
@ -351,6 +379,47 @@ export class MeshClient extends EventEmitter {
return result;
}
private async handleRecordUpdate(payload: {
recordId: string;
full?: any;
patch?: Operation[];
version: number;
}) {
const { recordId, full, patch, version } = payload;
const subscription = this.recordSubscriptions.get(recordId);
if (!subscription) {
return;
}
if (patch) {
if (version !== subscription.localVersion + 1) {
// desync
console.warn(
`[MeshClient] Desync detected for record ${recordId}. Expected version ${
subscription.localVersion + 1
}, got ${version}. Resubscribing to request full record.`
);
// unsubscribe and resubscribe to force a full update
await this.unsubscribeRecord(recordId);
await this.subscribeRecord(recordId, subscription.callback, {
mode: subscription.mode,
});
return;
}
subscription.localVersion = version;
await subscription.callback({ patch, version });
return;
}
if (full !== undefined) {
subscription.localVersion = version;
await subscription.callback({ full, version });
}
}
/**
* Subscribes to a specific channel and registers a callback to be invoked
* whenever a message is received on that channel. Optionally retrieves a
@ -403,4 +472,72 @@ export class MeshClient extends EventEmitter {
unsubscribe(channel: string): Promise<boolean> {
return this.command("unsubscribe-channel", { channel });
}
/**
* Subscribes to a specific record and registers a callback for updates.
*
* @param {string} recordId - The ID of the record to subscribe to.
* @param {(update: { full?: any; patch?: Operation[]; version: number }) => void | Promise<void>} callback - Function called on updates.
* @param {{ mode?: "patch" | "full" }} [options] - Subscription mode ('patch' or 'full', default 'full').
* @returns {Promise<{ success: boolean; record: any | null; version: number }>} Initial state of the record.
*/
async subscribeRecord(
recordId: string,
callback: (update: {
full?: any;
patch?: Operation[];
version: number;
}) => void | Promise<void>,
options?: { mode?: "patch" | "full" }
): Promise<{ success: boolean; record: any | null; version: number }> {
const mode = options?.mode ?? "full";
try {
const result = await this.command("subscribe-record", { recordId, mode });
if (result.success) {
this.recordSubscriptions.set(recordId, {
callback,
localVersion: result.version,
mode,
});
// Immediately call callback with the initial full record
await callback({ full: result.record, version: result.version });
}
return {
success: result.success,
record: result.record ?? null,
version: result.version ?? 0,
};
} catch (error) {
console.error(
`[MeshClient] Failed to subscribe to record ${recordId}:`,
error
);
return { success: false, record: null, version: 0 };
}
}
/**
* Unsubscribes from a specific record.
*
* @param {string} recordId - The ID of the record to unsubscribe from.
* @returns {Promise<boolean>} True if successful, false otherwise.
*/
async unsubscribeRecord(recordId: string): Promise<boolean> {
try {
const success = await this.command("unsubscribe-record", { recordId });
if (success) {
this.recordSubscriptions.delete(recordId);
}
return success;
} catch (error) {
console.error(
`[MeshClient] Failed to unsubscribe from record ${recordId}:`,
error
);
return false;
}
}
}

View File

@ -1,3 +1,3 @@
export { MeshClient, Status } from "./client";
export { MeshClient, Status, applyPatch } from "./client";
export { Connection } from "./connection";
export { CodeError } from "../common/codeerror";

View File

@ -1,3 +1,3 @@
export { MeshClient } from "./client";
export { MeshClient, applyPatch } from "./client";
export { MeshServer, type MeshContext, type SocketMiddleware } from "./server";
export { type CodeError } from "./common/codeerror";

View File

@ -3,12 +3,17 @@ import { v4 as uuidv4 } from "uuid";
import { Redis, type RedisOptions } from "ioredis";
import { WebSocket, WebSocketServer, type ServerOptions } from "ws";
import { RoomManager } from "./room-manager";
import { RecordManager } from "./record-manager";
import { ConnectionManager } from "./connection-manager";
import { CodeError, Status } from "../client";
import { Connection } from "./connection";
import { parseCommand, type Command } from "../common/message";
import type { Operation } from "fast-json-patch";
export { RecordManager }; // Export RecordManager
const PUB_SUB_CHANNEL_PREFIX = "mesh:pubsub:";
const RECORD_PUB_SUB_CHANNEL = "mesh:record-updates";
export class MeshContext<T = any> {
server: MeshServer;
@ -38,6 +43,13 @@ type PubSubMessagePayload = {
command: Command;
};
type RecordUpdatePubSubPayload = {
recordId: string;
newValue?: any;
patch?: Operation[];
version: number;
};
export type MeshServerOptions = ServerOptions & {
/**
* The interval at which to send ping messages to the client.
@ -77,15 +89,25 @@ export class MeshServer extends WebSocketServer {
pubClient: Redis;
subClient: Redis;
roomManager: RoomManager;
recordManager: RecordManager;
connectionManager: ConnectionManager;
serverOptions: MeshServerOptions;
status: Status = Status.OFFLINE;
private exposedChannels: ChannelPattern[] = [];
private exposedRecords: ChannelPattern[] = [];
private channelSubscriptions: { [channel: string]: Set<Connection> } = {};
private recordSubscriptions: Map<
string, // recordId
Map<string, "patch" | "full"> // connectionId -> mode
> = new Map();
private channelGuards: Map<
ChannelPattern,
(connection: Connection, channel: string) => Promise<boolean> | boolean
> = new Map();
private recordGuards: Map<
ChannelPattern,
(connection: Connection, recordId: string) => Promise<boolean> | boolean
> = new Map();
private _isShuttingDown = false;
commands: {
@ -138,6 +160,7 @@ export class MeshServer extends WebSocketServer {
this.subClient = this.redis.duplicate();
this.roomManager = new RoomManager(this.redis);
this.recordManager = new RecordManager(this.redis);
this.connectionManager = new ConnectionManager(
this.pubClient,
this.instanceId,
@ -159,6 +182,7 @@ export class MeshServer extends WebSocketServer {
});
this.registerBuiltinCommands();
this.registerRecordCommands(); // Add this line
this.applyListeners();
}
@ -180,10 +204,13 @@ export class MeshServer extends WebSocketServer {
const channel = `${PUB_SUB_CHANNEL_PREFIX}${this.instanceId}`;
this._subscriptionPromise = new Promise((resolve, reject) => {
this.subClient.subscribe(channel, (err) => {
this.subClient.subscribe(channel, RECORD_PUB_SUB_CHANNEL, (err) => {
if (err) {
if (!this._isShuttingDown) {
console.error(`Failed to subscribe to channel ${channel}:`, err);
console.error(
`Failed to subscribe to channels ${channel}, ${RECORD_PUB_SUB_CHANNEL}:`,
err
);
}
reject(err);
return;
@ -194,13 +221,18 @@ export class MeshServer extends WebSocketServer {
this.subClient.on("message", async (channel, message) => {
if (channel.startsWith(PUB_SUB_CHANNEL_PREFIX)) {
this.handlePubSubMessage(channel, message);
this.handleInstancePubSubMessage(channel, message);
} else if (channel === RECORD_PUB_SUB_CHANNEL) {
this.handleRecordUpdatePubSubMessage(message);
} else if (this.channelSubscriptions[channel]) {
// Handle regular channel subscriptions
for (const connection of this.channelSubscriptions[channel]) {
connection.send({
command: "subscription-message",
payload: { channel, message },
});
if (!connection.isDead) {
connection.send({
command: "subscription-message",
payload: { channel, message },
});
}
}
}
});
@ -208,7 +240,7 @@ export class MeshServer extends WebSocketServer {
return this._subscriptionPromise;
}
private handlePubSubMessage(channel: string, message: string) {
private handleInstancePubSubMessage(channel: string, message: string) {
try {
const parsedMessage = JSON.parse(message) as PubSubMessagePayload;
@ -236,6 +268,51 @@ export class MeshServer extends WebSocketServer {
}
}
private handleRecordUpdatePubSubMessage(message: string) {
try {
const parsedMessage = JSON.parse(message) as RecordUpdatePubSubPayload;
const { recordId, newValue, patch, version } = parsedMessage;
if (!recordId || typeof version !== "number") {
throw new Error("Invalid record update message format");
}
const subscribers = this.recordSubscriptions.get(recordId);
if (!subscribers) {
return; // No local subscribers for this record
}
subscribers.forEach((mode, connectionId) => {
const connection =
this.connectionManager.getLocalConnection(connectionId);
if (connection && !connection.isDead) {
if (mode === "patch" && patch) {
connection.send({
command: "record-update",
payload: { recordId, patch, version },
});
} else if (mode === "full" && newValue !== undefined) {
connection.send({
command: "record-update",
payload: { recordId, full: newValue, version },
});
}
} else if (!connection) {
// Clean up stale subscription if connection no longer exists locally
subscribers.delete(connectionId);
if (subscribers.size === 0) {
this.recordSubscriptions.delete(recordId);
}
}
});
} catch (err) {
this.emit(
"error",
new Error(`Failed to parse record update message: ${message}`)
);
}
}
private applyListeners() {
this.on("connection", async (socket: WebSocket, req: IncomingMessage) => {
const connection = new Connection(socket, req, this.serverOptions);
@ -330,6 +407,51 @@ export class MeshServer extends WebSocketServer {
return true;
}
/**
* Exposes a record or pattern for client subscriptions, optionally adding a guard function.
*
* @param {ChannelPattern} recordPattern - The record ID or pattern to expose.
* @param {(connection: Connection, recordId: string) => Promise<boolean> | boolean} [guard] - Optional guard function.
*/
exposeRecord(
recordPattern: ChannelPattern,
guard?: (
connection: Connection,
recordId: string
) => Promise<boolean> | boolean
): void {
this.exposedRecords.push(recordPattern);
if (guard) {
this.recordGuards.set(recordPattern, guard);
}
}
private async isRecordExposed(
recordId: string,
connection: Connection
): Promise<boolean> {
const matchedPattern = this.exposedRecords.find((pattern) =>
typeof pattern === "string"
? pattern === recordId
: pattern.test(recordId)
);
if (!matchedPattern) {
return false;
}
const guard = this.recordGuards.get(matchedPattern);
if (guard) {
try {
return await Promise.resolve(guard(connection, recordId));
} catch (e) {
return false;
}
}
return true;
}
/**
* Publishes a message to a specified channel and optionally maintains a history of messages.
*
@ -353,6 +475,47 @@ export class MeshServer extends WebSocketServer {
await this.pubClient.publish(channel, message);
}
/**
* Updates a record, persists it to Redis, increments its version, computes a patch,
* and publishes the update via Redis pub/sub.
*
* @param {string} recordId - The ID of the record to update.
* @param {any} newValue - The new value for the record.
* @returns {Promise<void>}
* @throws {Error} If the update fails.
*/
async publishRecordUpdate(recordId: string, newValue: any): Promise<void> {
const updateResult = await this.recordManager.publishUpdate(
recordId,
newValue
);
if (!updateResult) {
return; // No change detected
}
const { patch, version } = updateResult;
const messagePayload: RecordUpdatePubSubPayload = {
recordId,
newValue, // Always include newValue for 'full' subscribers
patch,
version,
};
try {
await this.pubClient.publish(
RECORD_PUB_SUB_CHANNEL,
JSON.stringify(messagePayload)
);
} catch (err) {
this.emit(
"error",
new Error(`Failed to publish record update for "${recordId}": ${err}`)
);
}
}
/**
* Adds one or more middleware functions to the global middleware stack.
*
@ -410,7 +573,6 @@ export class MeshServer extends WebSocketServer {
}
this.channelSubscriptions[channel].add(ctx.connection);
// Fetch channel history if historyLimit is provided
let history: string[] = [];
if (historyLimit && historyLimit > 0) {
const historyKey = `history:${channel}`;
@ -448,6 +610,53 @@ export class MeshServer extends WebSocketServer {
);
}
private registerRecordCommands() {
this.registerCommand<
{ recordId: string; mode?: "patch" | "full" },
{ success: boolean; record?: any; version?: number }
>("subscribe-record", async (ctx) => {
const { recordId, mode = "full" } = ctx.payload;
const connectionId = ctx.connection.id;
if (!(await this.isRecordExposed(recordId, ctx.connection))) {
return { success: false };
}
try {
const { record, version } =
await this.recordManager.getRecordAndVersion(recordId);
if (!this.recordSubscriptions.has(recordId)) {
this.recordSubscriptions.set(recordId, new Map());
}
this.recordSubscriptions.get(recordId)!.set(connectionId, mode);
return { success: true, record, version };
} catch (e) {
console.error(`Failed to subscribe to record ${recordId}:`, e);
return { success: false };
}
});
this.registerCommand<{ recordId: string }, boolean>(
"unsubscribe-record",
async (ctx) => {
const { recordId } = ctx.payload;
const connectionId = ctx.connection.id;
const recordSubs = this.recordSubscriptions.get(recordId);
if (recordSubs?.has(connectionId)) {
recordSubs.delete(connectionId);
if (recordSubs.size === 0) {
this.recordSubscriptions.delete(recordId);
}
return true;
}
return false;
}
);
}
/**
* Adds an array of middleware functions to a specific command.
*
@ -465,6 +674,18 @@ export class MeshServer extends WebSocketServer {
}
}
private async cleanupRecordSubscriptions(connection: Connection) {
const connectionId = connection.id;
this.recordSubscriptions.forEach((subscribers, recordId) => {
if (subscribers.has(connectionId)) {
subscribers.delete(connectionId);
if (subscribers.size === 0) {
this.recordSubscriptions.delete(recordId);
}
}
});
}
private async runCommand(
id: number,
commandName: string,
@ -732,6 +953,7 @@ export class MeshServer extends WebSocketServer {
await this.connectionManager.cleanupConnection(connection);
await this.roomManager.cleanupConnection(connection);
await this.cleanupRecordSubscriptions(connection); // Ensure this line exists
}
/**

View File

@ -0,0 +1,129 @@
import type { Redis } from "ioredis";
import jsonpatch, { type Operation } from "fast-json-patch";
const RECORD_KEY_PREFIX = "mesh:record:";
const RECORD_VERSION_KEY_PREFIX = "mesh:record-version:";
export class RecordManager {
private redis: Redis;
constructor(redis: Redis) {
this.redis = redis;
}
private recordKey(recordId: string): string {
return `${RECORD_KEY_PREFIX}${recordId}`;
}
private recordVersionKey(recordId: string): string {
return `${RECORD_VERSION_KEY_PREFIX}${recordId}`;
}
/**
* Retrieves a record from Redis by its unique identifier. Attempts to parse
* the stored data as JSON before returning. If the record does not exist,
* returns null.
*
* @param {string} recordId - The unique identifier of the record to retrieve.
* @returns {Promise<any | null>} A promise that resolves to the parsed record object,
* or null if the record does not exist.
* @throws {SyntaxError} If the stored data is not valid JSON and cannot be parsed.
* @throws {Error} If an error occurs during the Redis operation.
*/
async getRecord(recordId: string): Promise<any | null> {
const data = await this.redis.get(this.recordKey(recordId));
return data ? JSON.parse(data) : null;
}
/**
* Retrieves the version number associated with the specified record ID from Redis.
* If no version is found, returns 0.
*
* @param {string} recordId - The unique identifier for the record whose version is to be retrieved.
* @returns {Promise<number>} A promise that resolves to the version number of the record. Returns 0 if not found.
* @throws {Error} If there is an issue communicating with Redis or parsing the version.
*/
async getVersion(recordId: string): Promise<number> {
const version = await this.redis.get(this.recordVersionKey(recordId));
return version ? parseInt(version, 10) : 0;
}
/**
* Retrieves a record and its associated version from Redis.
* Fetches both the record data and its version by their respective keys.
*
* @param {string} recordId - The unique identifier for the record to retrieve.
* @returns {Promise<{ record: any | null; version: number }>}
* A promise that resolves to an object containing the parsed record (or null if not found)
* and its version number (0 if version data is not found or invalid).
* @throws {Error} If there is a Redis error or if JSON parsing fails for the record data.
*/
async getRecordAndVersion(
recordId: string
): Promise<{ record: any | null; version: number }> {
const pipeline = this.redis.pipeline();
pipeline.get(this.recordKey(recordId));
pipeline.get(this.recordVersionKey(recordId));
const results = await pipeline.exec();
const recordData = results?.[0]?.[1] as string | null;
const versionData = results?.[1]?.[1] as string | null;
const record = recordData ? JSON.parse(recordData) : null;
const version = versionData ? parseInt(versionData, 10) : 0;
return { record, version };
}
/**
* Publishes an update to a record by computing and applying a JSON Patch,
* incrementing the version, and persisting the updated value and version in Redis.
* If there are no changes between the old and new value, returns null.
*
* @param {string} recordId - The unique identifier of the record to update.
* @param {any} newValue - The new value to set for the record.
* @returns {Promise<{ patch: Operation[]; version: number } | null>}
* A promise resolving to an object containing the JSON Patch operations and new version number,
* or null if there were no changes to publish.
* @throws {Error} If there is a failure reading or writing to Redis, or during patch computation, the promise will be rejected with the error.
*/
async publishUpdate(
recordId: string,
newValue: any
): Promise<{ patch: Operation[]; version: number } | null> {
const recordKey = this.recordKey(recordId);
const versionKey = this.recordVersionKey(recordId);
const { record: oldValue, version: oldVersion } =
await this.getRecordAndVersion(recordId);
const patch = jsonpatch.compare(oldValue ?? {}, newValue ?? {});
if (patch.length === 0) {
return null;
}
const newVersion = oldVersion + 1;
const pipeline = this.redis.pipeline();
pipeline.set(recordKey, JSON.stringify(newValue));
pipeline.set(versionKey, newVersion.toString());
await pipeline.exec();
return { patch, version: newVersion };
}
/**
* Deletes a record and its associated version from Redis storage.
*
* @param {string} recordId - The unique identifier of the record to be deleted.
* @returns {Promise<void>} A promise that resolves when the record and its version have been deleted.
* @throws {Error} If an error occurs during the Redis pipeline execution, the promise will be rejected with the error.
*/
async deleteRecord(recordId: string): Promise<void> {
const pipeline = this.redis.pipeline();
pipeline.del(this.recordKey(recordId));
pipeline.del(this.recordVersionKey(recordId));
await pipeline.exec();
}
}

View File

@ -0,0 +1,309 @@
import { describe, test, expect, beforeEach, afterEach, vi } from "vitest";
import Redis from "ioredis";
import { MeshServer } from "../server";
import { MeshClient } from "../client";
const REDIS_HOST = process.env.REDIS_HOST || "127.0.0.1";
const REDIS_PORT = process.env.REDIS_PORT
? parseInt(process.env.REDIS_PORT, 10)
: 6379;
const createTestServer = (port: number) =>
new MeshServer({
port,
redisOptions: {
host: REDIS_HOST,
port: REDIS_PORT,
},
pingInterval: 1000,
latencyInterval: 500,
});
const flushRedis = async () => {
const redis = new Redis({ host: REDIS_HOST, port: REDIS_PORT });
await redis.flushdb();
await redis.quit();
};
const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
describe("Record Subscription", () => {
const port = 8130;
let server: MeshServer;
let client1: MeshClient;
let client2: MeshClient;
beforeEach(async () => {
await flushRedis();
server = createTestServer(port);
server.exposeRecord(/^test:record:.*/);
server.exposeRecord("guarded:record");
await server.ready();
client1 = new MeshClient(`ws://localhost:${port}`);
client2 = new MeshClient(`ws://localhost:${port}`);
});
afterEach(async () => {
await client1.close();
await client2.close();
await server.close();
});
test("client can subscribe to an exposed record and get initial state", async () => {
const recordId = "test:record:1";
const initialData = { count: 0, name: "initial" };
await server.publishRecordUpdate(recordId, initialData);
await client1.connect();
const callback = vi.fn();
const { success, record, version } = await client1.subscribeRecord(
recordId,
callback
);
expect(success).toBe(true);
expect(version).toBe(1);
expect(record).toEqual(initialData);
// callback is called once initially with the full record
expect(callback).toHaveBeenCalledTimes(1);
expect(callback).toHaveBeenCalledWith({ full: initialData, version: 1 });
});
test("client cannot subscribe to an unexposed record", async () => {
await client1.connect();
const callback = vi.fn();
const { success, record, version } = await client1.subscribeRecord(
"unexposed:record",
callback
);
expect(success).toBe(false);
expect(version).toBe(0);
expect(record).toBeNull();
expect(callback).not.toHaveBeenCalled();
});
test("record guard prevents unauthorized subscriptions", async () => {
await client1.connect();
await client2.connect();
const connections = server.connectionManager.getLocalConnections();
const connection1Id = connections[0]?.id;
server.exposeRecord(
"guarded:record",
(connection, recId) => connection.id === connection1Id
);
const callback1 = vi.fn();
const result1 = await client1.subscribeRecord("guarded:record", callback1);
const callback2 = vi.fn();
const result2 = await client2.subscribeRecord("guarded:record", callback2);
expect(result1.success).toBe(true);
expect(result1.version).toBe(0); // nothing published yet
expect(result1.record).toBeNull();
expect(callback1).toHaveBeenCalledTimes(1); // initial call with null
expect(result2.success).toBe(false);
expect(result2.version).toBe(0);
expect(result2.record).toBeNull();
expect(callback2).not.toHaveBeenCalled();
});
test("client receives full updates by default", async () => {
const recordId = "test:record:full";
await client1.connect();
const updates: any[] = [];
const callback = (update: any) => {
updates.push(update);
};
await client1.subscribeRecord(recordId, callback);
const data1 = { count: 1 };
await server.publishRecordUpdate(recordId, data1);
await wait(50); // because pub/sub takes a bit
const data2 = { count: 2, name: "hello" };
await server.publishRecordUpdate(recordId, data2);
await wait(50);
expect(updates.length).toBe(3); // initial + 2 updates
expect(updates[0]).toEqual({ full: null, version: 0 });
expect(updates[1]).toEqual({ full: data1, version: 1 });
expect(updates[2]).toEqual({ full: data2, version: 2 });
});
test("client receives patch updates when mode is 'patch'", async () => {
const recordId = "test:record:patch";
await client1.connect();
const updates: any[] = [];
const callback = (update: any) => {
updates.push(update);
};
await client1.subscribeRecord(recordId, callback, { mode: "patch" });
const data1 = { count: 1 };
await server.publishRecordUpdate(recordId, data1);
await wait(50);
const data2 = { count: 1, name: "added" };
await server.publishRecordUpdate(recordId, data2);
await wait(50);
const data3 = { name: "added" };
await server.publishRecordUpdate(recordId, data3);
await wait(50);
expect(updates.length).toBe(4);
expect(updates[0]).toEqual({ full: null, version: 0 });
expect(updates[1]).toEqual({
patch: [{ op: "add", path: "/count", value: 1 }],
version: 1,
});
expect(updates[2]).toEqual({
patch: [{ op: "add", path: "/name", value: "added" }],
version: 2,
});
expect(updates[3]).toEqual({
patch: [{ op: "remove", path: "/count" }],
version: 3,
});
});
test("multiple clients receive updates based on their mode", async () => {
const recordId = "test:record:multi";
await client1.connect();
await client2.connect();
const updates1: any[] = [];
const callback1 = (update: any) => {
updates1.push(update);
};
await client1.subscribeRecord(recordId, callback1);
const updates2: any[] = [];
const callback2 = (update: any) => {
updates2.push(update);
};
await client2.subscribeRecord(recordId, callback2, { mode: "patch" });
const data1 = { value: "a" };
await server.publishRecordUpdate(recordId, data1);
await wait(100);
const data2 = { value: "b" };
await server.publishRecordUpdate(recordId, data2);
await wait(100);
// client 1 wants full updates
expect(updates1.length).toBe(3);
expect(updates1[0]).toEqual({ full: null, version: 0 });
expect(updates1[1]).toEqual({ full: data1, version: 1 });
expect(updates1[2]).toEqual({ full: data2, version: 2 });
// client 2 wants patches
expect(updates2.length).toBe(3);
expect(updates2[0]).toEqual({ full: null, version: 0 });
expect(updates2[1]).toEqual({
patch: [{ op: "add", path: "/value", value: "a" }],
version: 1,
});
expect(updates2[2]).toEqual({
patch: [{ op: "replace", path: "/value", value: "b" }],
version: 2,
});
});
test("client stops receiving updates after unsubscribing", async () => {
const recordId = "test:record:unsub";
await client1.connect();
const updates: any[] = [];
const callback = (update: any) => {
updates.push(update);
};
await client1.subscribeRecord(recordId, callback);
await server.publishRecordUpdate(recordId, { count: 1 });
await wait(50);
const unsubSuccess = await client1.unsubscribeRecord(recordId);
expect(unsubSuccess).toBe(true);
await server.publishRecordUpdate(recordId, { count: 2 });
await wait(50);
expect(updates.length).toBe(2);
expect(updates[0]).toEqual({ full: null, version: 0 });
expect(updates[1]).toEqual({ full: { count: 1 }, version: 1 });
});
test("desync detection triggers resubscribe (patch mode)", async () => {
const recordId = "test:record:desync";
await client1.connect();
const updates: any[] = [];
const callback = vi.fn((update: any) => {
updates.push(update);
});
// spy on resub attempt
const commandSpy = vi.spyOn(client1.connection, "command");
await client1.subscribeRecord(recordId, callback, { mode: "patch" }); // v0, initial full
// v1
await server.publishRecordUpdate(recordId, { count: 1 });
await wait(50); // client receives v1 patch
// publish v2 and v3 without notifying client via pub/sub
const v2Result = await server.recordManager.publishUpdate(recordId, {
count: 2,
});
const v3Result = await server.recordManager.publishUpdate(recordId, {
count: 3,
});
expect(v2Result?.version).toBe(2);
expect(v3Result?.version).toBe(3);
// publish v4 via the proper mechanism, while client expects v2
const data4 = { count: 4 };
await server.publishRecordUpdate(recordId, data4); // v4
await wait(100); // allocate time for desync handling
expect(callback).toHaveBeenCalledTimes(3); // v0, v1, v4
expect(updates[0]).toEqual({ full: null, version: 0 });
expect(updates[1]).toEqual({
patch: [{ op: "add", path: "/count", value: 1 }],
version: 1,
});
// third call is the full record after resync
expect(updates[2]).toEqual({ full: data4, version: 4 });
// verify unsubscribe and subscribe were called for resync
expect(commandSpy).toHaveBeenCalledWith(
"unsubscribe-record",
{ recordId },
30000
);
expect(commandSpy).toHaveBeenCalledWith(
"subscribe-record",
{
recordId,
mode: "patch",
},
30000
);
});
});