diff --git a/packages/mesh/package.json b/packages/mesh/package.json index ac2ea6d..0949eea 100644 --- a/packages/mesh/package.json +++ b/packages/mesh/package.json @@ -12,6 +12,11 @@ "types": "./dist/client/index.d.ts", "import": "./dist/client/index.js", "require": "./dist/client/index.cjs" + }, + "./client-utils": { + "types": "./dist/client-utils/index.d.ts", + "import": "./dist/client-utils/index.js", + "require": "./dist/client-utils/index.cjs" } }, "typesVersions": { @@ -21,14 +26,18 @@ ], "client": [ "dist/client/index.d.ts" + ], + "client-utils": [ + "dist/client-utils/index.d.ts" ] } }, "scripts": { - "build": "bun run build:prep && bun run build:server && bun run build:client", + "build": "bun run build:prep && bun run build:server && bun run build:client && bun run build:client-utils", "build:client": "tsup src/client/index.ts --format cjs,esm --dts --clean --minify --out-dir dist/client", - "build:prep": "rm -rf dist && mkdir dist && mkdir dist/server && mkdir dist/client", + "build:prep": "rm -rf dist && mkdir dist && mkdir dist/server && mkdir dist/client && mkdir dist/client-utils", "build:server": "tsup src/server/index.ts --format cjs,esm --dts --clean --minify --out-dir dist/server", + "build:client-utils": "tsup src/client-utils/index.ts --format cjs,esm --dts --clean --minify --out-dir dist/client-utils", "test": "vitest" }, "dependencies": { diff --git a/packages/mesh/src/client-utils/README.md b/packages/mesh/src/client-utils/README.md new file mode 100644 index 0000000..878b878 --- /dev/null +++ b/packages/mesh/src/client-utils/README.md @@ -0,0 +1,80 @@ +## Deduplicated Presence + +Sometimes, a single user may have multiple connections (tabs, devices) in a room. By default, `subscribePresence(...)` emits events for each connection individually — so a single user might appear multiple times. + +The `createDedupedPresenceHandler` utility helps you group those events into a single presence entry per logical entity — such as a user — using whatever logic you define. + +This is useful for: + +- Showing a clean “who’s online” list +- Displaying a single “typing...” indicator per user +- Tracking presence by user, session, device, or any custom identifier + +### Usage + +```ts +import { createDedupedPresenceHandler } from "@prsm/mesh/client-utils"; +import { client } from "./client"; // your MeshClient instance + +const handler = createDedupedPresenceHandler({ + getGroupId: async (connectionId) => { + // Group by userId if available, otherwise fallback to connectionId + const metadata = await client.getConnectionMetadata(connectionId); + return metadata.userId ?? connectionId; + }, + onUpdate: (groups) => { + // `groups` is a Map + const users = Array.from(groups.entries()).map(([groupId, group]) => ({ + id: groupId, + state: group.state, + tabCount: group.members.size, + })); + + // Defined below + renderPresenceList(users); + }, +}); + +await client.subscribePresence("room:chat", handler); +``` + +**What does `groups` contain?** + +Each `group` looks like this: + +```ts +{ + representative: "conn123", // Most recent connection to update state + state: { status: "typing" }, // Most recent presence state (or null) + timestamp: 1713748000000, // Time of last state update + members: new Set(["conn123", "conn456"]) // All connections in the group +} +``` + +You can group by basically anything in `getGroupId` — connection metadata, session cookies, localStorage — it’s up to you. In the example above, we’re grouping by `userId` if present, or falling back to `connectionId` so that all connections are still shown individually when needed. + +### Rendering to the DOM + +Here’s a simple example that displays deduplicated users in the UI: + +```ts +function renderPresenceList(users) { + const container = document.querySelector("#presence"); + container.innerHTML = users + .map((user) => { + const status = user.state?.status ?? "idle"; + return ` +
+ ${user.id}: ${status} (tabs: ${user.tabCount}) +
`; + }) + .join(""); +} +``` + +Shows something like: + +```ts +Alice: typing (tabs: 2) +conn-m9sdkxww000007079ff77: idle (tabs: 1) +``` diff --git a/packages/mesh/src/client-utils/index.ts b/packages/mesh/src/client-utils/index.ts new file mode 100644 index 0000000..14578e8 --- /dev/null +++ b/packages/mesh/src/client-utils/index.ts @@ -0,0 +1,68 @@ +import type { PresenceUpdate } from "../client/client"; + +type DedupedPresenceGroup = { + representative: string; + state: any | null; + timestamp: number | null; + members: Set; +}; + +export interface CreateDedupedPresenceHandlerOptions { + getGroupId: (connectionId: string) => Promise; + onUpdate: (groups: Map) => void; +} + +export function createDedupedPresenceHandler( + options: CreateDedupedPresenceHandlerOptions +) { + const { getGroupId, onUpdate } = options; + + const groupMap = new Map(); + const connectionToGroup = new Map(); + + return async (update: PresenceUpdate) => { + const { connectionId, type, timestamp = Date.now() } = update; + + let groupId = connectionToGroup.get(connectionId); + if (!groupId) { + groupId = (await getGroupId(connectionId)) ?? `conn:${connectionId}`; + connectionToGroup.set(connectionId, groupId); + } + + let group = groupMap.get(groupId); + + if (type === "join") { + if (!group) { + group = { + representative: connectionId, + state: null, + timestamp: null, + members: new Set(), + }; + groupMap.set(groupId, group); + } + group.members.add(connectionId); + } + + if (type === "leave" && group) { + group.members.delete(connectionId); + + if (group.members.size === 0) { + groupMap.delete(groupId); + } else if (group.representative === connectionId) { + group.representative = group.members.values().next().value!; + } + } + + if (type === "state" && group) { + const { state } = update; + if (!group.timestamp || timestamp >= group.timestamp) { + group.state = state; + group.timestamp = timestamp; + group.representative = connectionId; + } + } + + onUpdate(groupMap); + }; +} diff --git a/packages/mesh/src/tests/client-utils.test.ts b/packages/mesh/src/tests/client-utils.test.ts new file mode 100644 index 0000000..19ed115 --- /dev/null +++ b/packages/mesh/src/tests/client-utils.test.ts @@ -0,0 +1,334 @@ +import { describe, test, expect, vi } from "vitest"; +import { createDedupedPresenceHandler } from "../client-utils"; +import type { PresenceUpdate } from "../client/client"; + +describe("createDedupedPresenceHandler", () => { + test("adds a new group when a connection joins and is resolved to a new groupId", async () => { + const getGroupId = vi + .fn() + .mockImplementation( + async (connectionId) => `group:${connectionId.substring(0, 3)}` + ); + + const onUpdate = vi.fn(); + + const handler = createDedupedPresenceHandler({ + getGroupId, + onUpdate, + }); + + const update: PresenceUpdate = { + type: "join", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1000, + }; + + await handler(update); + + expect(getGroupId).toHaveBeenCalledWith("conn123"); + expect(onUpdate).toHaveBeenCalledTimes(1); + + expect(onUpdate).toHaveBeenCalled(); + const groupMap = onUpdate.mock.calls![0]![0] as Map; + expect(groupMap.size).toBe(1); + expect(groupMap.has("group:con")).toBe(true); + + const group = groupMap.get("group:con"); + expect(group.representative).toBe("conn123"); + expect(group.members.size).toBe(1); + expect(group.members.has("conn123")).toBe(true); + }); + + test("adds the connection to an existing group if another connection already resolved to the same groupId", async () => { + const getGroupId = vi + .fn() + .mockImplementation(async (connectionId) => "group:same"); + + const onUpdate = vi.fn(); + + const handler = createDedupedPresenceHandler({ + getGroupId, + onUpdate, + }); + + // first connection joins + await handler({ + type: "join", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1000, + }); + + onUpdate.mockClear(); + + // second connection joins with same group ID + await handler({ + type: "join", + connectionId: "conn456", + roomName: "test-room", + timestamp: 1001, + }); + + expect(getGroupId).toHaveBeenCalledWith("conn456"); + expect(onUpdate).toHaveBeenCalledTimes(1); + + expect(onUpdate).toHaveBeenCalled(); + const groupMap = onUpdate.mock.calls![0]![0] as Map; + expect(groupMap.size).toBe(1); + + const group = groupMap.get("group:same"); + // first connection remains the representative + expect(group.representative).toBe("conn123"); + expect(group.members.size).toBe(2); + expect(group.members.has("conn123")).toBe(true); + expect(group.members.has("conn456")).toBe(true); + }); + + test("removes the group when the last connection in that group leaves", async () => { + const getGroupId = vi + .fn() + .mockImplementation(async (connectionId) => "group:test"); + + const onUpdate = vi.fn(); + + const handler = createDedupedPresenceHandler({ + getGroupId, + onUpdate, + }); + + // connection joins + await handler({ + type: "join", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1000, + }); + + onUpdate.mockClear(); + + // connection leaves + await handler({ + type: "leave", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1001, + }); + + expect(onUpdate).toHaveBeenCalledTimes(1); + + expect(onUpdate).toHaveBeenCalled(); + const groupMap = onUpdate.mock.calls![0]![0] as Map; + // group should be removed + expect(groupMap.size).toBe(0); + }); + + test("promotes a new representative when the current representative leaves", async () => { + const getGroupId = vi + .fn() + .mockImplementation(async (connectionId) => "group:test"); + + const onUpdate = vi.fn(); + + const handler = createDedupedPresenceHandler({ + getGroupId, + onUpdate, + }); + + // first connection joins (becomes representative) + await handler({ + type: "join", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1000, + }); + + // second connection joins + await handler({ + type: "join", + connectionId: "conn456", + roomName: "test-room", + timestamp: 1001, + }); + + onUpdate.mockClear(); + + // representative leaves + await handler({ + type: "leave", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1002, + }); + + expect(onUpdate).toHaveBeenCalledTimes(1); + + expect(onUpdate).toHaveBeenCalled(); + const groupMap = onUpdate.mock.calls![0]![0] as Map; + expect(groupMap.size).toBe(1); + + const group = groupMap.get("group:test"); + // second connection should be promoted + expect(group.representative).toBe("conn456"); + expect(group.members.size).toBe(1); + expect(group.members.has("conn456")).toBe(true); + }); + + test("updates state when a state update is received", async () => { + const getGroupId = vi + .fn() + .mockImplementation(async (connectionId) => "group:test"); + + const onUpdate = vi.fn(); + + const handler = createDedupedPresenceHandler({ + getGroupId, + onUpdate, + }); + + // connection joins + await handler({ + type: "join", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1000, + }); + + onUpdate.mockClear(); + + // connection updates state + await handler({ + type: "state", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1001, + state: { status: "typing" }, + }); + + expect(onUpdate).toHaveBeenCalledTimes(1); + + expect(onUpdate).toHaveBeenCalled(); + const groupMap = onUpdate.mock.calls![0]![0] as Map; + const group = groupMap.get("group:test"); + expect(group.state).toEqual({ status: "typing" }); + expect(group.timestamp).toBe(1001); + }); + + test("only updates state if timestamp is newer", async () => { + const getGroupId = vi + .fn() + .mockImplementation(async (connectionId) => "group:test"); + + const onUpdate = vi.fn(); + + const handler = createDedupedPresenceHandler({ + getGroupId, + onUpdate, + }); + + // two connections join the same group + await handler({ + type: "join", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1000, + }); + + await handler({ + type: "join", + connectionId: "conn456", + roomName: "test-room", + timestamp: 1001, + }); + + // first connection sets state + await handler({ + type: "state", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1002, + state: { status: "typing" }, + }); + + onUpdate.mockClear(); + + // second connection tries to set state with older timestamp + await handler({ + type: "state", + connectionId: "conn456", + roomName: "test-room", + timestamp: 1001, + state: { status: "idle" }, + }); + + expect(onUpdate).toHaveBeenCalledTimes(1); + + expect(onUpdate).toHaveBeenCalled(); + const groupMap = onUpdate.mock.calls![0]![0] as Map; + const group = groupMap.get("group:test"); + // should keep the first state + expect(group.state).toEqual({ status: "typing" }); + expect(group.timestamp).toBe(1002); + // representative should not change + expect(group.representative).toBe("conn123"); + }); + + test("changes representative when state is updated with newer timestamp", async () => { + const getGroupId = vi + .fn() + .mockImplementation(async (connectionId) => "group:test"); + + const onUpdate = vi.fn(); + + const handler = createDedupedPresenceHandler({ + getGroupId, + onUpdate, + }); + + // two connections join the same group + await handler({ + type: "join", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1000, + }); + + await handler({ + type: "join", + connectionId: "conn456", + roomName: "test-room", + timestamp: 1001, + }); + + // first connection sets state + await handler({ + type: "state", + connectionId: "conn123", + roomName: "test-room", + timestamp: 1002, + state: { status: "typing" }, + }); + + onUpdate.mockClear(); + + // second connection sets state with newer timestamp + await handler({ + type: "state", + connectionId: "conn456", + roomName: "test-room", + timestamp: 1003, + state: { status: "idle" }, + }); + + expect(onUpdate).toHaveBeenCalledTimes(1); + + expect(onUpdate).toHaveBeenCalled(); + const groupMap = onUpdate.mock.calls![0]![0] as Map; + const group = groupMap.get("group:test"); + // should update to new state + expect(group.state).toEqual({ status: "idle" }); + expect(group.timestamp).toBe(1003); + // representative should change + expect(group.representative).toBe("conn456"); + }); +});