feature: exposeWritableRecord and client.publishRecordUpdate

This commit is contained in:
nvms 2025-04-18 10:07:24 -04:00
parent 5a59182775
commit f9ccd98d39
4 changed files with 333 additions and 22 deletions

View File

@ -13,8 +13,10 @@ Mesh is a command-based WebSocket framework for real-time apps—whether you're
- [Metadata](#metadata)
- [Room Metadata](#room-metadata)
- [Record Subscriptions](#record-subscriptions)
- [Server Configuration](#server-configuration-1)
- [Updating Records](#updating-records)
- [Server Configuration (Read-Only)](#server-configuration-read-only)
- [Server Configuration (Writable)](#server-configuration-writable)
- [Updating Records (Server-Side)](#updating-records-server-side)
- [Updating Records (Client-Side)](#updating-records-client-side)
- [Client Usage — Full Mode (default)](#client-usage--full-mode-default)
- [Client Usage — Patch Mode](#client-usage--patch-mode)
- [Unsubscribing](#unsubscribing)
@ -280,9 +282,27 @@ server.exposeRecord(/^private:.+$/, async (conn, recordId) => {
});
```
### Updating Records
### Server Configuration (Writable)
Use `publishRecordUpdate()` to update the stored value, increment the version, generate a patch, and broadcast to all subscribed clients.
To allow clients to *subscribe* and also *modify* records, use `exposeWritableRecord`. This also accepts optional guard functions to control *write* access:
```ts
// Allow any connected client to write to cursor records
server.exposeWritableRecord(/^cursor:user:\d+$/);
// Allow only authenticated users to write to their profile
server.exposeWritableRecord(/^profile:user:\d+$/, async (conn, recordId) => {
const meta = await server.connectionManager.getMetadata(conn);
const recordUserId = recordId.split(':').pop();
return meta?.userId === recordUserId; // Check if user ID matches record ID
});
```
**Important:** Records exposed via `exposeWritableRecord` are automatically readable (subscribable) by clients. You don't need to call `exposeRecord` for the same pattern. However, if you want different guards for reading and writing, you can expose the same pattern with both methods, each with its own guard.
### Updating Records (Server-Side)
Use `publishRecordUpdate()` from the server to update the stored value, increment the version, generate a patch, and broadcast to all subscribed clients (both read-only and writable).
```ts
await server.publishRecordUpdate("user:123", {
@ -298,6 +318,29 @@ await server.publishRecordUpdate("user:123", {
});
```
### Updating Records (Client-Side)
If a record has been exposed as writable via `exposeWritableRecord` on the server (and any guard function passes), clients can publish updates using the `publishRecordUpdate` method:
```ts
const userId = '123';
const success = await client.publishRecordUpdate(`cursor:user:${userId}`, {
x: 100,
y: 250,
timestamp: Date.now(),
});
if (success) {
console.log("Cursor position updated successfully.");
} else {
console.error("Failed to update cursor position (maybe permission denied?).");
}
```
This client-initiated update will be processed by the server, which then uses the same `publishRecordUpdate` mechanism internally to persist the change and broadcast it (as a full value or patch) to all other subscribed clients. The method returns `true` if the server accepted the write, and `false` if it was rejected (e.g., due to a failed guard).
**Note:** When a client publishes an update to a record using `publishRecordUpdate`, it will also receive that update through its subscription callback just like any other client. This ensures consistency and simplifies update handling. If your app logic already applies local updates optimistically, you may choose to ignore redundant self-updates in your callback.
### Client Usage — Full Mode (default)
In `full` mode, the client receives the entire updated record every time. This is simpler to use and ideal for small records or when patching isn't needed.

View File

@ -97,27 +97,19 @@ export class MeshClient extends EventEmitter {
private setupConnectionEvents(): void {
this.connection.on("message", (data) => {
// data is the parsed command object
this.emit("message", data); // Emit generic message event
this.emit("message", data);
// Handle specific commands first
if (data.command === "record-update") {
this.handleRecordUpdate(data.payload);
// Optionally emit the specific event if needed elsewhere
// this.emit(data.command, data.payload);
} else if (data.command === "subscription-message") {
// Let the specific listener in subscribe() handle this
// Emit it here for the existing subscribe logic to work
this.emit(data.command, data.payload);
} else {
// Handle other non-system commands by emitting events
const systemCommands = [
"ping",
"pong",
"latency",
"latency:request",
"latency:response",
// 'subscription-message' and 'record-update' are handled above
];
if (data.command && !systemCommands.includes(data.command)) {
this.emit(data.command, data.payload);
@ -503,7 +495,7 @@ export class MeshClient extends EventEmitter {
localVersion: result.version,
mode,
});
// Immediately call callback with the initial full record
await callback({
recordId,
full: result.record,
@ -546,4 +538,27 @@ export class MeshClient extends EventEmitter {
return false;
}
}
/**
* Publishes an update to a specific record if the client has write permissions.
*
* @param {string} recordId - The ID of the record to update.
* @param {any} newValue - The new value for the record.
* @returns {Promise<boolean>} True if the update was successfully published, false otherwise.
*/
async publishRecordUpdate(recordId: string, newValue: any): Promise<boolean> {
try {
const result = await this.command("publish-record-update", {
recordId,
newValue,
});
return result.success === true;
} catch (error) {
console.error(
`[MeshClient] Failed to publish update for record ${recordId}:`,
error
);
return false;
}
}
}

View File

@ -93,6 +93,7 @@ export class MeshServer extends WebSocketServer {
status: Status = Status.OFFLINE;
private exposedChannels: ChannelPattern[] = [];
private exposedRecords: ChannelPattern[] = [];
private exposedWritableRecords: ChannelPattern[] = []; // New: Track writable records
private channelSubscriptions: { [channel: string]: Set<Connection> } = {};
private recordSubscriptions: Map<
string, // recordId
@ -106,6 +107,11 @@ export class MeshServer extends WebSocketServer {
ChannelPattern,
(connection: Connection, recordId: string) => Promise<boolean> | boolean
> = new Map();
private writableRecordGuards: Map<
// New: Guards for writable records
ChannelPattern,
(connection: Connection, recordId: string) => Promise<boolean> | boolean
> = new Map();
private _isShuttingDown = false;
commands: {
@ -223,7 +229,6 @@ export class MeshServer extends WebSocketServer {
} else if (channel === RECORD_PUB_SUB_CHANNEL) {
this.handleRecordUpdatePubSubMessage(message);
} else if (this.channelSubscriptions[channel]) {
// Handle regular channel subscriptions
for (const connection of this.channelSubscriptions[channel]) {
if (!connection.isDead) {
connection.send({
@ -276,8 +281,9 @@ export class MeshServer extends WebSocketServer {
}
const subscribers = this.recordSubscriptions.get(recordId);
if (!subscribers) {
return; // No local subscribers for this record
return;
}
subscribers.forEach((mode, connectionId) => {
@ -296,7 +302,6 @@ export class MeshServer extends WebSocketServer {
});
}
} else if (!connection) {
// Clean up stale subscription if connection no longer exists locally
subscribers.delete(connectionId);
if (subscribers.size === 0) {
this.recordSubscriptions.delete(recordId);
@ -384,7 +389,6 @@ export class MeshServer extends WebSocketServer {
channel: string,
connection: Connection
): Promise<boolean> {
// First check if the channel matches any exposed pattern
const matchedPattern = this.exposedChannels.find((pattern) =>
typeof pattern === "string" ? pattern === channel : pattern.test(channel)
);
@ -428,7 +432,69 @@ export class MeshServer extends WebSocketServer {
recordId: string,
connection: Connection
): Promise<boolean> {
const matchedPattern = this.exposedRecords.find((pattern) =>
const readPattern = this.exposedRecords.find((pattern) =>
typeof pattern === "string"
? pattern === recordId
: pattern.test(recordId)
);
let canRead = false;
if (readPattern) {
const guard = this.recordGuards.get(readPattern);
if (guard) {
try {
canRead = await Promise.resolve(guard(connection, recordId));
} catch (e) {
canRead = false;
}
} else {
canRead = true;
}
}
if (canRead) {
return true;
}
// if exposed as writable, it is implicitly readable
const writePattern = this.exposedWritableRecords.find((pattern) =>
typeof pattern === "string"
? pattern === recordId
: pattern.test(recordId)
);
// If exposed as writable, it's readable. No need to check the *write* guard here.
if (writePattern) {
return true;
}
return false;
}
/**
* Exposes a record or pattern for client writes, optionally adding a guard function.
*
* @param {ChannelPattern} recordPattern - The record ID or pattern to expose as writable.
* @param {(connection: Connection, recordId: string) => Promise<boolean> | boolean} [guard] - Optional guard function.
*/
exposeWritableRecord(
recordPattern: ChannelPattern,
guard?: (
connection: Connection,
recordId: string
) => Promise<boolean> | boolean
): void {
this.exposedWritableRecords.push(recordPattern);
if (guard) {
this.writableRecordGuards.set(recordPattern, guard);
}
}
private async isRecordWritable(
recordId: string,
connection: Connection
): Promise<boolean> {
const matchedPattern = this.exposedWritableRecords.find((pattern) =>
typeof pattern === "string"
? pattern === recordId
: pattern.test(recordId)
@ -438,7 +504,7 @@ export class MeshServer extends WebSocketServer {
return false;
}
const guard = this.recordGuards.get(matchedPattern);
const guard = this.writableRecordGuards.get(matchedPattern);
if (guard) {
try {
return await Promise.resolve(guard(connection, recordId));
@ -489,14 +555,14 @@ export class MeshServer extends WebSocketServer {
);
if (!updateResult) {
return; // No change detected
return;
}
const { patch, version } = updateResult;
const messagePayload: RecordUpdatePubSubPayload = {
recordId,
newValue, // Always include newValue for 'full' subscribers
newValue,
patch,
version,
};
@ -653,6 +719,33 @@ export class MeshServer extends WebSocketServer {
return false;
}
);
// New command for client-initiated record updates
this.registerCommand<
{ recordId: string; newValue: any },
{ success: boolean }
>("publish-record-update", async (ctx) => {
const { recordId, newValue } = ctx.payload;
if (!(await this.isRecordWritable(recordId, ctx.connection))) {
throw new CodeError(
`Record "${recordId}" is not writable by this connection.`,
"EACCESS",
"PermissionError"
);
}
try {
await this.publishRecordUpdate(recordId, newValue);
return { success: true };
} catch (e: any) {
throw new CodeError(
`Failed to publish update for record "${recordId}": ${e.message}`,
"EUPDATE",
"UpdateError"
);
}
});
}
/**

View File

@ -39,6 +39,8 @@ describe("Record Subscription", () => {
server = createTestServer(port);
server.exposeRecord(/^test:record:.*/);
server.exposeRecord("guarded:record");
server.exposeWritableRecord(/^writable:record:.*/);
server.exposeWritableRecord("guarded:writable");
await server.ready();
client1 = new MeshClient(`ws://localhost:${port}`);
@ -321,4 +323,162 @@ describe("Record Subscription", () => {
30000
);
});
test("client can write to an exposed writable record", async () => {
const recordId = "writable:record:1";
await client1.connect();
await client2.connect();
const updatesClient2: any[] = [];
const callbackClient2 = vi.fn((update: any) => {
updatesClient2.push(update);
});
// check subscription success and initial call
const subResult = await client2.subscribeRecord(recordId, callbackClient2); // Subscribe before write
expect(subResult.success).toBe(true);
expect(subResult.record).toBeNull();
expect(subResult.version).toBe(0);
expect(callbackClient2).toHaveBeenCalledTimes(1);
expect(callbackClient2).toHaveBeenCalledWith({
recordId,
full: null,
version: 0,
});
const initialData = { value: "initial" };
// client 1 writes
const success = await client1.publishRecordUpdate(recordId, initialData);
expect(success).toBe(true);
await wait(150);
// client 2 received the update (initial call + 1 update)
expect(callbackClient2).toHaveBeenCalledTimes(2);
expect(updatesClient2.length).toBe(2);
expect(updatesClient2[1]).toEqual({
recordId,
full: initialData,
version: 1,
});
// verify server state
const { record, version } = await server.recordManager.getRecordAndVersion(
recordId
);
expect(record).toEqual(initialData);
expect(version).toBe(1);
});
test("client cannot write to a non-writable record (read-only exposed)", async () => {
const recordId = "test:record:readonly"; // exposed via exposeRecord, not exposeWritableRecord
await client1.connect();
const initialData = { value: "attempt" };
const success = await client1.publishRecordUpdate(recordId, initialData);
expect(success).toBe(false);
// verify server state hasn't changed
const { record, version } = await server.recordManager.getRecordAndVersion(
recordId
);
expect(record).toBeNull();
expect(version).toBe(0);
});
test("client cannot write to a record not exposed at all", async () => {
const recordId = "not:exposed:at:all";
await client1.connect();
const initialData = { value: "attempt" };
const success = await client1.publishRecordUpdate(recordId, initialData);
expect(success).toBe(false);
const { record, version } = await server.recordManager.getRecordAndVersion(
recordId
);
expect(record).toBeNull();
expect(version).toBe(0);
});
test("writable record guard prevents unauthorized writes", async () => {
const recordId = "guarded:writable";
await client1.connect();
await client2.connect();
const connections = server.connectionManager.getLocalConnections();
const connection1Id = connections[0]?.id;
// only client1 can write this record
server.exposeWritableRecord(
recordId,
(connection, recId) => connection.id === connection1Id
);
const data1 = { value: "from client 1" };
const success1 = await client1.publishRecordUpdate(recordId, data1);
expect(success1).toBe(true);
await wait(50);
let serverState = await server.recordManager.getRecordAndVersion(recordId);
expect(serverState.record).toEqual(data1);
expect(serverState.version).toBe(1);
const data2 = { value: "from client 2" };
const success2 = await client2.publishRecordUpdate(recordId, data2);
expect(success2).toBe(false);
await wait(50);
serverState = await server.recordManager.getRecordAndVersion(recordId);
expect(serverState.record).toEqual(data1); // unchanged
expect(serverState.version).toBe(1); // unchanged
});
test("update from client write propagates to other subscribed clients", async () => {
const recordId = "writable:record:propagate";
await client1.connect(); // writer
await client2.connect(); // subscriber
const updatesClient2: any[] = [];
const callbackClient2 = vi.fn((update: any) => {
updatesClient2.push(update);
});
const subResult = await client2.subscribeRecord(recordId, callbackClient2, {
mode: "patch",
});
expect(subResult.success).toBe(true);
expect(subResult.record).toBeNull();
expect(subResult.version).toBe(0);
expect(callbackClient2).toHaveBeenCalledTimes(1);
expect(callbackClient2).toHaveBeenCalledWith({
recordId,
full: null,
version: 0,
});
// client 1 writes
const data1 = { count: 1 };
await client1.publishRecordUpdate(recordId, data1);
await wait(100);
const data2 = { count: 1, name: "added" };
await client1.publishRecordUpdate(recordId, data2);
await wait(150);
// client 2 received the patches (initial call + 2 patches)
expect(callbackClient2).toHaveBeenCalledTimes(3);
expect(updatesClient2.length).toBe(3);
expect(updatesClient2[1]).toEqual({
recordId,
patch: [{ op: "add", path: "/count", value: 1 }],
version: 1,
});
expect(updatesClient2[2]).toEqual({
recordId,
patch: [{ op: "add", path: "/name", value: "added" }],
version: 2,
});
});
});