include the record identifier in the subscribeRecord callback

This commit is contained in:
nvms 2025-04-18 09:22:51 -04:00
parent 10c18f668e
commit 5a59182775
4 changed files with 48 additions and 20 deletions

View File

@ -308,8 +308,12 @@ let userProfile = {};
const { success, record, version } = await client.subscribeRecord( const { success, record, version } = await client.subscribeRecord(
"user:123", "user:123",
(update) => { (update) => {
// update contains { recordId, full, version }
userProfile = update.full; userProfile = update.full;
console.log(`Received full update v${update.version}:`, update.full); console.log(
`Received full update for ${update.recordId} v${update.version}:`,
update.full
);
} }
); );
@ -330,14 +334,17 @@ let productData = {};
const { success, record, version } = await client.subscribeRecord( const { success, record, version } = await client.subscribeRecord(
"product:456", "product:456",
(update) => { (update) => {
// update contains { recordId, patch?, full?, version }
if (update.patch) { if (update.patch) {
// normally youll receive `patch`, but if the client falls out of sync, // normally youll receive `patch`, but if the client falls out of sync,
// the server will send a full update instead to resynchronize. // the server will send a full update instead to resynchronize.
applyPatch(productData, update.patch); applyPatch(productData, update.patch);
console.log(`Applied patch v${update.version}`); console.log(`Applied patch for ${update.recordId} v${update.version}`);
} else { } else {
productData = update.full; productData = update.full;
console.log(`Received full (resync) v${update.version}`); console.log(
`Received full (resync) for ${update.recordId} v${update.version}`
);
} }
}, },
{ mode: "patch" } { mode: "patch" }

View File

@ -1,6 +1,6 @@
{ {
"name": "@prsm/mesh", "name": "@prsm/mesh",
"version": "1.0.2", "version": "1.0.3",
"type": "module", "type": "module",
"exports": { "exports": {
"./server": { "./server": {

View File

@ -66,6 +66,7 @@ export class MeshClient extends EventEmitter {
string, // recordId string, // recordId
{ {
callback: (update: { callback: (update: {
recordId: string;
full?: any; full?: any;
patch?: Operation[]; patch?: Operation[];
version: number; version: number;
@ -409,14 +410,14 @@ export class MeshClient extends EventEmitter {
} }
subscription.localVersion = version; subscription.localVersion = version;
await subscription.callback({ patch, version }); await subscription.callback({ recordId, patch, version });
return; return;
} }
if (full !== undefined) { if (full !== undefined) {
subscription.localVersion = version; subscription.localVersion = version;
await subscription.callback({ full, version }); await subscription.callback({ recordId, full, version });
} }
} }
@ -484,6 +485,7 @@ export class MeshClient extends EventEmitter {
async subscribeRecord( async subscribeRecord(
recordId: string, recordId: string,
callback: (update: { callback: (update: {
recordId: string;
full?: any; full?: any;
patch?: Operation[]; patch?: Operation[];
version: number; version: number;
@ -502,7 +504,11 @@ export class MeshClient extends EventEmitter {
mode, mode,
}); });
// Immediately call callback with the initial full record // Immediately call callback with the initial full record
await callback({ full: result.record, version: result.version }); await callback({
recordId,
full: result.record,
version: result.version,
});
} }
return { return {

View File

@ -70,7 +70,11 @@ describe("Record Subscription", () => {
// callback is called once initially with the full record // callback is called once initially with the full record
expect(callback).toHaveBeenCalledTimes(1); expect(callback).toHaveBeenCalledTimes(1);
expect(callback).toHaveBeenCalledWith({ full: initialData, version: 1 }); expect(callback).toHaveBeenCalledWith({
recordId,
full: initialData,
version: 1,
});
}); });
test("client cannot subscribe to an unexposed record", async () => { test("client cannot subscribe to an unexposed record", async () => {
@ -109,6 +113,11 @@ describe("Record Subscription", () => {
expect(result1.version).toBe(0); // nothing published yet expect(result1.version).toBe(0); // nothing published yet
expect(result1.record).toBeNull(); expect(result1.record).toBeNull();
expect(callback1).toHaveBeenCalledTimes(1); // initial call with null expect(callback1).toHaveBeenCalledTimes(1); // initial call with null
expect(callback1).toHaveBeenCalledWith({
recordId: "guarded:record",
full: null,
version: 0,
});
expect(result2.success).toBe(false); expect(result2.success).toBe(false);
expect(result2.version).toBe(0); expect(result2.version).toBe(0);
@ -136,9 +145,9 @@ describe("Record Subscription", () => {
await wait(50); await wait(50);
expect(updates.length).toBe(3); // initial + 2 updates expect(updates.length).toBe(3); // initial + 2 updates
expect(updates[0]).toEqual({ full: null, version: 0 }); expect(updates[0]).toEqual({ recordId, full: null, version: 0 });
expect(updates[1]).toEqual({ full: data1, version: 1 }); expect(updates[1]).toEqual({ recordId, full: data1, version: 1 });
expect(updates[2]).toEqual({ full: data2, version: 2 }); expect(updates[2]).toEqual({ recordId, full: data2, version: 2 });
}); });
test("client receives patch updates when mode is 'patch'", async () => { test("client receives patch updates when mode is 'patch'", async () => {
@ -165,16 +174,19 @@ describe("Record Subscription", () => {
await wait(50); await wait(50);
expect(updates.length).toBe(4); expect(updates.length).toBe(4);
expect(updates[0]).toEqual({ full: null, version: 0 }); expect(updates[0]).toEqual({ recordId, full: null, version: 0 });
expect(updates[1]).toEqual({ expect(updates[1]).toEqual({
recordId,
patch: [{ op: "add", path: "/count", value: 1 }], patch: [{ op: "add", path: "/count", value: 1 }],
version: 1, version: 1,
}); });
expect(updates[2]).toEqual({ expect(updates[2]).toEqual({
recordId,
patch: [{ op: "add", path: "/name", value: "added" }], patch: [{ op: "add", path: "/name", value: "added" }],
version: 2, version: 2,
}); });
expect(updates[3]).toEqual({ expect(updates[3]).toEqual({
recordId,
patch: [{ op: "remove", path: "/count" }], patch: [{ op: "remove", path: "/count" }],
version: 3, version: 3,
}); });
@ -207,18 +219,20 @@ describe("Record Subscription", () => {
// client 1 wants full updates // client 1 wants full updates
expect(updates1.length).toBe(3); expect(updates1.length).toBe(3);
expect(updates1[0]).toEqual({ full: null, version: 0 }); expect(updates1[0]).toEqual({ recordId, full: null, version: 0 });
expect(updates1[1]).toEqual({ full: data1, version: 1 }); expect(updates1[1]).toEqual({ recordId, full: data1, version: 1 });
expect(updates1[2]).toEqual({ full: data2, version: 2 }); expect(updates1[2]).toEqual({ recordId, full: data2, version: 2 });
// client 2 wants patches // client 2 wants patches
expect(updates2.length).toBe(3); expect(updates2.length).toBe(3);
expect(updates2[0]).toEqual({ full: null, version: 0 }); expect(updates2[0]).toEqual({ recordId, full: null, version: 0 });
expect(updates2[1]).toEqual({ expect(updates2[1]).toEqual({
recordId,
patch: [{ op: "add", path: "/value", value: "a" }], patch: [{ op: "add", path: "/value", value: "a" }],
version: 1, version: 1,
}); });
expect(updates2[2]).toEqual({ expect(updates2[2]).toEqual({
recordId,
patch: [{ op: "replace", path: "/value", value: "b" }], patch: [{ op: "replace", path: "/value", value: "b" }],
version: 2, version: 2,
}); });
@ -245,8 +259,8 @@ describe("Record Subscription", () => {
await wait(50); await wait(50);
expect(updates.length).toBe(2); expect(updates.length).toBe(2);
expect(updates[0]).toEqual({ full: null, version: 0 }); expect(updates[0]).toEqual({ recordId, full: null, version: 0 });
expect(updates[1]).toEqual({ full: { count: 1 }, version: 1 }); expect(updates[1]).toEqual({ recordId, full: { count: 1 }, version: 1 });
}); });
test("desync detection triggers resubscribe (patch mode)", async () => { test("desync detection triggers resubscribe (patch mode)", async () => {
@ -283,13 +297,14 @@ describe("Record Subscription", () => {
await wait(100); // allocate time for desync handling await wait(100); // allocate time for desync handling
expect(callback).toHaveBeenCalledTimes(3); // v0, v1, v4 expect(callback).toHaveBeenCalledTimes(3); // v0, v1, v4
expect(updates[0]).toEqual({ full: null, version: 0 }); expect(updates[0]).toEqual({ recordId, full: null, version: 0 });
expect(updates[1]).toEqual({ expect(updates[1]).toEqual({
recordId,
patch: [{ op: "add", path: "/count", value: 1 }], patch: [{ op: "add", path: "/count", value: 1 }],
version: 1, version: 1,
}); });
// third call is the full record after resync // third call is the full record after resync
expect(updates[2]).toEqual({ full: data4, version: 4 }); expect(updates[2]).toEqual({ recordId, full: data4, version: 4 });
// verify unsubscribe and subscribe were called for resync // verify unsubscribe and subscribe were called for resync
expect(commandSpy).toHaveBeenCalledWith( expect(commandSpy).toHaveBeenCalledWith(