From fb4f275d58ccb190089b7f0381ce3aad916101a2 Mon Sep 17 00:00:00 2001 From: nvms Date: Sun, 20 Apr 2025 15:40:08 -0400 Subject: [PATCH] use pattern subscription for presence and add more tests --- packages/mesh/src/server/index.ts | 55 +++-- .../src/tests/presence-subscription.test.ts | 214 ++++++++++++++++++ 2 files changed, 244 insertions(+), 25 deletions(-) diff --git a/packages/mesh/src/server/index.ts b/packages/mesh/src/server/index.ts index fc10292..5997442 100644 --- a/packages/mesh/src/server/index.ts +++ b/packages/mesh/src/server/index.ts @@ -211,7 +211,8 @@ export class MeshServer extends WebSocketServer { const channel = `${PUB_SUB_CHANNEL_PREFIX}${this.instanceId}`; this._subscriptionPromise = new Promise((resolve, reject) => { - this.subClient.subscribe(channel, RECORD_PUB_SUB_CHANNEL, (err) => { + this.subClient.subscribe(channel, RECORD_PUB_SUB_CHANNEL); + this.subClient.psubscribe("mesh:presence:updates:*", (err) => { if (err) { if (!this._isShuttingDown) { console.error( @@ -231,18 +232,6 @@ export class MeshServer extends WebSocketServer { this.handleInstancePubSubMessage(channel, message); } else if (channel === RECORD_PUB_SUB_CHANNEL) { this.handleRecordUpdatePubSubMessage(message); - } else if (channel.startsWith("mesh:presence:updates:")) { - const roomName = channel.replace("mesh:presence:updates:", ""); - if (this.channelSubscriptions[channel]) { - for (const connection of this.channelSubscriptions[channel]) { - if (!connection.isDead) { - connection.send({ - command: "mesh/presence-update", - payload: JSON.parse(message), - }); - } - } - } } else if (this.channelSubscriptions[channel]) { for (const connection of this.channelSubscriptions[channel]) { if (!connection.isDead) { @@ -255,6 +244,34 @@ export class MeshServer extends WebSocketServer { } }); + this.subClient.on("pmessage", async (pattern, channel, message) => { + if (pattern === "mesh:presence:updates:*") { + // channel here is the actual channel, e.g., mesh:presence:updates:roomName + const subscribers = this.channelSubscriptions[channel]; + if (subscribers) { + try { + const payload = JSON.parse(message); + subscribers.forEach((connection) => { + if (!connection.isDead) { + connection.send({ + command: "mesh/presence-update", + payload: payload, + }); + } else { + // clean up dead connections from subscription list + subscribers.delete(connection); + } + }); + } catch (e) { + this.emit( + "error", + new Error(`Failed to parse presence update: ${message}`) + ); + } + } + } + }); + return this._subscriptionPromise; } @@ -816,12 +833,6 @@ export class MeshServer extends WebSocketServer { if (!this.channelSubscriptions[presenceChannel]) { this.channelSubscriptions[presenceChannel] = new Set(); - await new Promise((resolve, reject) => { - this.subClient.subscribe(presenceChannel, (err) => { - if (err) reject(err); - else resolve(); - }); - }); } this.channelSubscriptions[presenceChannel].add(ctx.connection); @@ -850,12 +861,6 @@ export class MeshServer extends WebSocketServer { this.channelSubscriptions[presenceChannel].delete(ctx.connection); if (this.channelSubscriptions[presenceChannel].size === 0) { - await new Promise((resolve, reject) => { - this.subClient.unsubscribe(presenceChannel, (err) => { - if (err) reject(err); - else resolve(); - }); - }); delete this.channelSubscriptions[presenceChannel]; } diff --git a/packages/mesh/src/tests/presence-subscription.test.ts b/packages/mesh/src/tests/presence-subscription.test.ts index d25090b..55ac0a2 100644 --- a/packages/mesh/src/tests/presence-subscription.test.ts +++ b/packages/mesh/src/tests/presence-subscription.test.ts @@ -244,3 +244,217 @@ describe("Presence Subscription", () => { expect(present).not.toContain(connection2.id); }); }); + +describe("Presence Subscription (Multiple Instances)", () => { + let serverA: MeshServer; + let serverB: MeshServer; + let clientA: MeshClient; + let clientB: MeshClient; + let clientC: MeshClient; + + const portA = 8141; + const portB = 8142; + const roomName = "test:room:multi-instance"; + + beforeEach(async () => { + await flushRedis(); + + serverA = createTestServer(portA); + serverB = createTestServer(portB); + + // track presence on both servers + [serverA, serverB].forEach((server) => { + server.trackPresence(roomName); + }); + + await serverA.ready(); + await serverB.ready(); + + // register join/leave commands on both servers + [serverA, serverB].forEach((server) => { + server.registerCommand("join-room", async (ctx) => { + const { roomName } = ctx.payload; + try { + await server.addToRoom(roomName, ctx.connection); + return { success: true }; + } catch (e) { + console.error(`[Test Setup] Failed to join room ${roomName}:`, e); + return { success: false }; + } + }); + server.registerCommand("leave-room", async (ctx) => { + const { roomName } = ctx.payload; + try { + await server.removeFromRoom(roomName, ctx.connection); + return { success: true }; + } catch (e) { + console.error(`[Test Setup] Failed to leave room ${roomName}:`, e); + return { success: false }; + } + }); + }); + + // server a client: + clientA = new MeshClient(`ws://localhost:${portA}`); + + // server b clients: + clientB = new MeshClient(`ws://localhost:${portB}`); + clientC = new MeshClient(`ws://localhost:${portB}`); + }); + + afterEach(async () => { + await clientA.close(); + await clientB.close(); + await clientC.close(); + await serverA.close(); + await serverB.close(); + }); + + test("join event propagates across instances", async () => { + await clientA.connect(); // srv a + await clientB.connect(); // srv b + + const connectionsB_Server = serverB.connectionManager.getLocalConnections(); + const clientBId = connectionsB_Server[0]?.id; + expect(clientBId).toBeDefined(); + + const callbackA = vi.fn(); + const { present: initialPresentA } = await clientA.subscribePresence( + roomName, + callbackA + ); + expect(initialPresentA).toEqual([]); // empty room + + const joinResultB = await clientB.command("join-room", { roomName }); + expect(joinResultB.success).toBe(true); + + await wait(150); + + // client a (srv a) receives join event from client b (srv b) + expect(callbackA).toHaveBeenCalledTimes(1); + expect(callbackA).toHaveBeenCalledWith( + expect.objectContaining({ + type: "join", + roomName: roomName, + connectionId: clientBId, + }) + ); + }, 10000); + + test("leave event propagates across instances", async () => { + await clientA.connect(); + await clientB.connect(); + + const connectionsB_Server = serverB.connectionManager.getLocalConnections(); + const clientBId = connectionsB_Server[0]?.id; + expect(clientBId).toBeDefined(); + + const callbackA = vi.fn(); + const { present: initialPresentA } = await clientA.subscribePresence( + roomName, + callbackA + ); + expect(initialPresentA).toEqual([]); + + await clientB.command("join-room", { roomName }); + await wait(150); + + // client a receives join event from client b + expect(callbackA).toHaveBeenCalledTimes(1); + expect(callbackA).toHaveBeenCalledWith( + expect.objectContaining({ type: "join", connectionId: clientBId }) + ); + + // client B leaves the room via srv b + const leaveResultB = await clientB.command("leave-room", { roomName }); + expect(leaveResultB.success).toBe(true); + + await wait(150); + + // client a (srv a) receives leave event from client b (srv b) + expect(callbackA).toHaveBeenCalledTimes(2); + expect(callbackA).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + type: "leave", + roomName: roomName, + connectionId: clientBId, + }) + ); + }, 10000); + + test("disconnect event propagates as leave across instances", async () => { + await clientA.connect(); + await clientB.connect(); + + const connectionsB_Server = serverB.connectionManager.getLocalConnections(); + const clientBId = connectionsB_Server[0]?.id; + expect(clientBId).toBeDefined(); + + const callbackA = vi.fn(); + const { present: initialPresentA } = await clientA.subscribePresence( + roomName, + callbackA + ); + expect(initialPresentA).toEqual([]); + + await clientB.command("join-room", { roomName }); + await wait(150); + + expect(callbackA).toHaveBeenCalledTimes(1); + expect(callbackA).toHaveBeenCalledWith( + expect.objectContaining({ type: "join", connectionId: clientBId }) + ); + + // client b disconnects from server b + await clientB.close(); + + await wait(150); + + // client a receives leave event from client b's disconnection + expect(callbackA).toHaveBeenCalledTimes(2); + expect(callbackA).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + type: "leave", + roomName: roomName, + connectionId: clientBId, + }) + ); + }, 10000); + + test("initial presence list includes users from other instances", async () => { + await clientA.connect(); + await clientB.connect(); + await clientC.connect(); + + const connectionsB_Server = serverB.connectionManager.getLocalConnections(); + const clientBId = connectionsB_Server[0]?.id; + const clientCId = connectionsB_Server[1]?.id; + expect(clientBId).toBeDefined(); + expect(clientCId).toBeDefined(); + + // client b -> srv b + await clientB.command("join-room", { roomName }); + // client c -> srv b + await clientC.command("join-room", { roomName }); + + await wait(150); + + // client a subscribes to presence from srv a + const callbackA = vi.fn(); + const { success, present } = await clientA.subscribePresence( + roomName, + callbackA + ); + + expect(success).toBe(true); + // initial list contains client b and c + expect(present.length).toBe(2); + expect(present).toContain(clientBId); + expect(present).toContain(clientCId); + + // callback not invoked yet because no events have occurred + expect(callbackA).not.toHaveBeenCalled(); + }, 10000); +});