mirror of
https://github.com/nvms/prsm.git
synced 2025-12-15 15:50:53 +00:00
feature: ephemeral, room-scoped presence states
This commit is contained in:
parent
21dd9ccf7e
commit
fe163324df
@ -2,43 +2,48 @@
|
||||
|
||||
Mesh is a command-based WebSocket framework for real-time applications. It uses Redis to coordinate connections, rooms, presence, and shared state across application instances, with built-in support for structured commands, latency tracking, and automatic reconnection.
|
||||
|
||||
* [Quickstart](#quickstart)
|
||||
* [Server](#server)
|
||||
* [Client](#client)
|
||||
* [Next steps](#next-steps)
|
||||
* [Who is this for?](#who-is-this-for)
|
||||
* [Distributed messaging architecture](#distributed-messaging-architecture)
|
||||
* [Redis channel subscriptions](#redis-channel-subscriptions)
|
||||
* [Server configuration](#server-configuration)
|
||||
* [Server publishing](#server-publishing)
|
||||
* [Client usage](#client-usage)
|
||||
* [Rooms](#rooms)
|
||||
* [Joining a room](#joining-a-room)
|
||||
* [Leaving a room](#leaving-a-room)
|
||||
* [Server API](#server-api)
|
||||
* [Access control](#access-control)
|
||||
* [Presence](#presence)
|
||||
* [Server configuration](#server-configuration-1)
|
||||
* [Getting presence information (server-side)](#getting-presence-information-server-side)
|
||||
* [Client usage](#client-usage-1)
|
||||
* [Combining presence with user info](#combining-presence-with-user-info)
|
||||
* [Metadata](#metadata)
|
||||
* [Room metadata](#room-metadata)
|
||||
* [Record subscriptions](#record-subscriptions)
|
||||
* [Server configuration](#server-configuration-2)
|
||||
* [Server configuration (writable)](#server-configuration-writable)
|
||||
* [Updating records (server-side)](#updating-records-server-side)
|
||||
* [Updating records (client-side)](#updating-records-client-side)
|
||||
* [Client usage — full mode (default)](#client-usage--full-mode-default)
|
||||
* [Client usage — patch mode](#client-usage--patch-mode)
|
||||
* [Unsubscribing](#unsubscribing)
|
||||
* [Versioning and resync](#versioning-and-resync)
|
||||
* [Why you probably don't need client-side diffing](#why-you-probably-dont-need-client-side-diffing)
|
||||
* [Command middleware](#command-middleware)
|
||||
* [Latency tracking and connection liveness](#latency-tracking-and-connection-liveness)
|
||||
* [Server-side configuration](#server-side-configuration)
|
||||
* [Client-side configuration](#client-side-configuration)
|
||||
* [Comparison](#comparison)
|
||||
- [Quickstart](#quickstart)
|
||||
- [Server](#server)
|
||||
- [Client](#client)
|
||||
- [Next steps](#next-steps)
|
||||
- [Who is this for?](#who-is-this-for)
|
||||
- [Distributed messaging architecture](#distributed-messaging-architecture)
|
||||
- [Redis channel subscriptions](#redis-channel-subscriptions)
|
||||
- [Server configuration](#server-configuration)
|
||||
- [Server publishing](#server-publishing)
|
||||
- [Client usage](#client-usage)
|
||||
- [Rooms](#rooms)
|
||||
- [Joining a room](#joining-a-room)
|
||||
- [Leaving a room](#leaving-a-room)
|
||||
- [Server API](#server-api)
|
||||
- [Access control](#access-control)
|
||||
- [Presence](#presence)
|
||||
- [Server configuration](#server-configuration-1)
|
||||
- [Client usage](#client-usage-1)
|
||||
- [Getting presence information (server-side)](#getting-presence-information-server-side)
|
||||
- [Disabling auto-cleanup (optional)](#disabling-auto-cleanup-optional)
|
||||
- [Combining presence with user info](#combining-presence-with-user-info)
|
||||
- [Presence state](#presence-state)
|
||||
- [Client API](#client-api)
|
||||
- [Server behavior](#server-behavior)
|
||||
- [Receiving presence state updates](#receiving-presence-state-updates)
|
||||
- [Metadata](#metadata)
|
||||
- [Room metadata](#room-metadata)
|
||||
- [Record subscriptions](#record-subscriptions)
|
||||
- [Server configuration](#server-configuration-2)
|
||||
- [Server configuration (writable)](#server-configuration-writable)
|
||||
- [Updating records (server-side)](#updating-records-server-side)
|
||||
- [Updating records (client-side)](#updating-records-client-side)
|
||||
- [Client usage — full mode (default)](#client-usage--full-mode-default)
|
||||
- [Client usage — patch mode](#client-usage--patch-mode)
|
||||
- [Unsubscribing](#unsubscribing)
|
||||
- [Versioning and resync](#versioning-and-resync)
|
||||
- [Why you probably don't need client-side diffing](#why-you-probably-dont-need-client-side-diffing)
|
||||
- [Command middleware](#command-middleware)
|
||||
- [Latency tracking and connection liveness](#latency-tracking-and-connection-liveness)
|
||||
- [Server-side configuration](#server-side-configuration)
|
||||
- [Client-side configuration](#client-side-configuration)
|
||||
- [Comparison](#comparison)
|
||||
|
||||
## Quickstart
|
||||
|
||||
@ -319,7 +324,7 @@ This gives you full flexibility to enforce auth, roles, or custom logic per room
|
||||
Mesh provides a built-in presence system that tracks which connections are present in specific rooms and notifies clients when connections join or leave. This is ideal for building features like "who's online" indicators, or any real-time awareness of other users.
|
||||
|
||||
> [!NOTE]
|
||||
> Presence only tracks *connection IDs*, not metadata. You must join them explicitly if you want to show e.g. usernames, avatars, emails, etc.
|
||||
> Presence only tracks _connection IDs_, not metadata. You must join them explicitly if you want to show e.g. usernames, avatars, emails, etc.
|
||||
|
||||
### Server configuration
|
||||
|
||||
@ -339,7 +344,7 @@ server.trackPresence("admin-room", {
|
||||
guard: async (conn, roomName) => {
|
||||
const meta = await server.connectionManager.getMetadata(conn);
|
||||
return meta?.isAdmin === true;
|
||||
}
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
@ -415,7 +420,7 @@ server.onConnection(async (connection) => {
|
||||
await server.connectionManager.setMetadata(connection, {
|
||||
userId: "user123",
|
||||
username: "Alice",
|
||||
avatar: "https://example.com/avatar.png"
|
||||
avatar: "https://example.com/avatar.png",
|
||||
});
|
||||
});
|
||||
```
|
||||
@ -427,9 +432,9 @@ const { success, present } = await client.subscribePresence(
|
||||
"lobby",
|
||||
async (update) => {
|
||||
const metadata = await client.command("get-user-metadata", {
|
||||
connectionId: update.connectionId
|
||||
connectionId: update.connectionId,
|
||||
});
|
||||
|
||||
|
||||
if (update.type === "join") {
|
||||
console.log(`${metadata.username} joined the lobby`);
|
||||
} else if (update.type === "leave") {
|
||||
@ -443,7 +448,9 @@ To resolve all present users:
|
||||
|
||||
```ts
|
||||
const allMetadata = await Promise.all(
|
||||
present.map((connectionId) => client.command("get-user-metadata", { connectionId }))
|
||||
present.map((connectionId) =>
|
||||
client.command("get-user-metadata", { connectionId })
|
||||
)
|
||||
);
|
||||
|
||||
// [{ userId: "user123", username: "Alice", avatar: "..." }, ...]
|
||||
@ -452,6 +459,81 @@ const allMetadata = await Promise.all(
|
||||
> [!TIP]
|
||||
> You can expose a `get-user-metadata` command on the server that reads from `connectionManager.getMetadata(...)` to support this.
|
||||
|
||||
### Presence state
|
||||
|
||||
In addition to tracking who is present in a room, Mesh lets clients publish **custom ephemeral presence states** — things like `"typing"`, `"away"`, `"drawing"`, etc.
|
||||
|
||||
Presence states are:
|
||||
|
||||
- Scoped per connection, per room
|
||||
- Completely defined by your app (no built-in types)
|
||||
- Optionally ephemeral via client-defined expiration
|
||||
- Broadcast to all clients subscribed to presence in that room
|
||||
|
||||
> [!NOTE]
|
||||
> Presence states are separate from connection metadata. States are transient and lightweight.
|
||||
|
||||
#### Client API
|
||||
|
||||
To publish a presence state:
|
||||
|
||||
```ts
|
||||
await client.publishPresenceState("room:lobby", {
|
||||
state: { status: "typing", field: "title" },
|
||||
expireAfter: 8000, // optional (ms)
|
||||
});
|
||||
```
|
||||
|
||||
To clear the state manually before it expires:
|
||||
|
||||
```ts
|
||||
await client.clearPresenceState("room:lobby");
|
||||
```
|
||||
|
||||
#### Server behavior
|
||||
|
||||
- The state is stored in Redis under
|
||||
`mesh:presence:state:{room}:{connectionId}`
|
||||
- If `ts expireAfter ` is set, it’s stored with a TTL (using Redis PX)
|
||||
- If the key expires:
|
||||
- The state is deleted
|
||||
- A `ts { type: "state", state: null } ` event is broadcast to subscribed clients so they can do any necessary UI updates
|
||||
- If the client clears it manually, the same event is emitted immediately
|
||||
- If the client leaves the room or disconnects, their state is cleared
|
||||
|
||||
#### Receiving presence state updates
|
||||
|
||||
Clients already using `ts subscribePresence(...) ` will receive `state` updates:
|
||||
|
||||
```ts
|
||||
const { success, present, states } = await client.subscribePresence(
|
||||
"room:lobby",
|
||||
(update) => {
|
||||
if (update.type === "state") {
|
||||
if (update.state) {
|
||||
console.log(`${update.connectionId} is now`, update.state);
|
||||
} else {
|
||||
console.log(`${update.connectionId}'s state was cleared`);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
```
|
||||
|
||||
The `ts states ` object includes the currently active presence states at the time of subscription.
|
||||
|
||||
Example:
|
||||
|
||||
```ts
|
||||
{
|
||||
"abc123": { status: "typing", field: "title" },
|
||||
"def456": { status: "away" }
|
||||
}
|
||||
```
|
||||
|
||||
> [!TIP]
|
||||
> This is a low-level signaling primitive. You decide what presence states mean in your app. Mesh just relays and cleans them up.
|
||||
|
||||
### Metadata
|
||||
|
||||
You can associate data like user IDs, tokens, or custom attributes with a connection using the `setMetadata` method. This metadata is stored in Redis, making it ideal for identifying users, managing permissions, or persisting session-related data across a distributed setup. Since it lives in Redis, it’s accessible from any server instance.
|
||||
@ -820,6 +902,8 @@ Together, this system provides end-to-end connection liveness guarantees without
|
||||
| **Raw Events Support** | ✅ | ✅ | ⚠️ Limited | ✅ | ✅ | ✅ |
|
||||
| **Room Support** | ✅ | ✅ | ✅ | ✅ | ⚠️ DIY | ⚠️ Manual |
|
||||
| **Presence Tracking** | ✅ Built-in | ⚠️ Manual | ✅ | ✅ | ❌ | ❌ |
|
||||
| **Presence State** | ✅ Client-defined | ❌ | ❌ | ⚠️ Records | ❌ | ❌ |
|
||||
| **Presence Expiration** | ✅ TTL + silent cleanup | ⚠️ Manual | ❌ | ❌ | ❌ | ❌ |
|
||||
| **Redis Scaling** | ✅ Native | ✅ With adapter | ✅ | ✅ | ✅ If added | ❌ |
|
||||
| **Connection Metadata** | ✅ Redis-backed | ⚠️ Manual | ⚠️ Limited | ✅ Records | ❌ | ❌ |
|
||||
| **Latency Tracking** | ✅ Built-in | ⚠️ Manual | ❌ | ❌ | ❌ | ❌ |
|
||||
|
||||
@ -11,7 +11,8 @@ export { applyPatch } from "fast-json-patch";
|
||||
|
||||
export type PresenceUpdate =
|
||||
| { type: "join"; connectionId: string }
|
||||
| { type: "leave"; connectionId: string };
|
||||
| { type: "leave"; connectionId: string }
|
||||
| { type: "state"; connectionId: string; state: Record<string, any> | null };
|
||||
|
||||
export type PresenceUpdateCallback = (
|
||||
update: PresenceUpdate
|
||||
@ -87,10 +88,11 @@ export class MeshClient extends EventEmitter {
|
||||
private presenceSubscriptions: Map<
|
||||
string, // roomName
|
||||
(update: {
|
||||
type: "join" | "leave";
|
||||
type: "join" | "leave" | "state";
|
||||
connectionId: string;
|
||||
roomName: string;
|
||||
timestamp: number;
|
||||
state?: Record<string, any> | null;
|
||||
metadata?: any;
|
||||
}) => void | Promise<void>
|
||||
> = new Map();
|
||||
@ -394,10 +396,11 @@ export class MeshClient extends EventEmitter {
|
||||
}
|
||||
|
||||
private async handlePresenceUpdate(payload: {
|
||||
type: "join" | "leave";
|
||||
type: "join" | "leave" | "state";
|
||||
connectionId: string;
|
||||
roomName: string;
|
||||
timestamp: number;
|
||||
state?: Record<string, any> | null;
|
||||
metadata?: any;
|
||||
}) {
|
||||
const { roomName } = payload;
|
||||
@ -655,25 +658,30 @@ export class MeshClient extends EventEmitter {
|
||||
* Subscribes to presence updates for a specific room.
|
||||
*
|
||||
* @param {string} roomName - The name of the room to subscribe to presence updates for.
|
||||
* @param {(update: { type: "join" | "leave"; connectionId: string; roomName: string; timestamp: number; metadata?: any }) => void | Promise<void>} callback - Function called on presence updates.
|
||||
* @returns {Promise<{ success: boolean; present: string[] }>} Initial state of presence in the room.
|
||||
* @param {PresenceUpdateCallback} callback - Function called on presence updates.
|
||||
* @returns {Promise<{ success: boolean; present: string[]; states?: Record<string, Record<string, any>> }>} Initial state of presence in the room.
|
||||
*/
|
||||
async subscribePresence(
|
||||
roomName: string,
|
||||
callback: PresenceUpdateCallback
|
||||
): Promise<{ success: boolean; present: string[] }> {
|
||||
): Promise<{
|
||||
success: boolean;
|
||||
present: string[];
|
||||
states?: Record<string, Record<string, any>>;
|
||||
}> {
|
||||
try {
|
||||
const result = await this.command("mesh/subscribe-presence", {
|
||||
roomName,
|
||||
});
|
||||
|
||||
if (result.success) {
|
||||
this.presenceSubscriptions.set(roomName, callback);
|
||||
this.presenceSubscriptions.set(roomName, callback as any);
|
||||
}
|
||||
|
||||
return {
|
||||
success: result.success,
|
||||
present: result.present || [],
|
||||
states: result.states || {},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error(
|
||||
@ -707,4 +715,55 @@ export class MeshClient extends EventEmitter {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Publishes a presence state for the current client in a room
|
||||
*
|
||||
* @param {string} roomName - The name of the room
|
||||
* @param {object} options - Options including state and optional TTL
|
||||
* @param {Record<string, any>} options.state - The state object to publish
|
||||
* @param {number} [options.expireAfter] - Optional TTL in milliseconds
|
||||
* @returns {Promise<boolean>} True if successful, false otherwise
|
||||
*/
|
||||
async publishPresenceState(
|
||||
roomName: string,
|
||||
options: {
|
||||
state: Record<string, any>;
|
||||
expireAfter?: number; // optional, in milliseconds
|
||||
}
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
return await this.command("mesh/publish-presence-state", {
|
||||
roomName,
|
||||
state: options.state,
|
||||
expireAfter: options.expireAfter,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`[MeshClient] Failed to publish presence state for room ${roomName}:`,
|
||||
error
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the presence state for the current client in a room
|
||||
*
|
||||
* @param {string} roomName - The name of the room
|
||||
* @returns {Promise<boolean>} True if successful, false otherwise
|
||||
*/
|
||||
async clearPresenceState(roomName: string): Promise<boolean> {
|
||||
try {
|
||||
return await this.command("mesh/clear-presence-state", {
|
||||
roomName,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`[MeshClient] Failed to clear presence state for room ${roomName}:`,
|
||||
error
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,8 @@ export class PresenceManager {
|
||||
}
|
||||
|
||||
private readonly PRESENCE_KEY_PATTERN = /^mesh:presence:room:(.+):conn:(.+)$/;
|
||||
private readonly PRESENCE_STATE_KEY_PATTERN =
|
||||
/^mesh:presence:state:(.+):conn:(.+)$/;
|
||||
private trackedRooms: ChannelPattern[] = [];
|
||||
private roomGuards: Map<
|
||||
ChannelPattern,
|
||||
@ -35,7 +37,7 @@ export class PresenceManager {
|
||||
this.roomManager = roomManager;
|
||||
this.redisManager = redisManager;
|
||||
this.presenceExpirationEventsEnabled = enableExpirationEvents;
|
||||
|
||||
|
||||
if (this.presenceExpirationEventsEnabled) {
|
||||
this.subscribeToExpirationEvents();
|
||||
}
|
||||
@ -48,9 +50,12 @@ export class PresenceManager {
|
||||
const { subClient } = this.redisManager;
|
||||
const pattern = this.getExpiredEventsPattern();
|
||||
subClient.psubscribe(pattern);
|
||||
|
||||
|
||||
subClient.on("pmessage", (pattern, channel, key) => {
|
||||
if (this.PRESENCE_KEY_PATTERN.test(key)) {
|
||||
if (
|
||||
this.PRESENCE_KEY_PATTERN.test(key) ||
|
||||
this.PRESENCE_STATE_KEY_PATTERN.test(key)
|
||||
) {
|
||||
this.handleExpiredKey(key);
|
||||
}
|
||||
});
|
||||
@ -61,11 +66,21 @@ export class PresenceManager {
|
||||
*/
|
||||
private async handleExpiredKey(key: string): Promise<void> {
|
||||
try {
|
||||
const match = key.match(this.PRESENCE_KEY_PATTERN);
|
||||
// Check if it's a presence key
|
||||
let match = key.match(this.PRESENCE_KEY_PATTERN);
|
||||
if (match && match[1] && match[2]) {
|
||||
const roomName = match[1];
|
||||
const connectionId = match[2];
|
||||
await this.markOffline(connectionId, roomName);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if it's a presence state key
|
||||
match = key.match(this.PRESENCE_STATE_KEY_PATTERN);
|
||||
if (match && match[1] && match[2]) {
|
||||
const roomName = match[1];
|
||||
const connectionId = match[2];
|
||||
await this.publishPresenceStateUpdate(roomName, connectionId, null);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error("[PresenceManager] Failed to handle expired key:", err);
|
||||
@ -158,6 +173,10 @@ export class PresenceManager {
|
||||
return `mesh:presence:room:${roomName}:conn:${connectionId}`;
|
||||
}
|
||||
|
||||
private presenceStateKey(roomName: string, connectionId: string): string {
|
||||
return `mesh:presence:state:${roomName}:conn:${connectionId}`;
|
||||
}
|
||||
|
||||
async markOnline(connectionId: string, roomName: string): Promise<void> {
|
||||
const roomKey = this.presenceRoomKey(roomName);
|
||||
const connKey = this.presenceConnectionKey(roomName, connectionId);
|
||||
@ -175,10 +194,12 @@ export class PresenceManager {
|
||||
async markOffline(connectionId: string, roomName: string): Promise<void> {
|
||||
const roomKey = this.presenceRoomKey(roomName);
|
||||
const connKey = this.presenceConnectionKey(roomName, connectionId);
|
||||
const stateKey = this.presenceStateKey(roomName, connectionId);
|
||||
|
||||
const pipeline = this.redis.pipeline();
|
||||
pipeline.srem(roomKey, connectionId);
|
||||
pipeline.del(connKey);
|
||||
pipeline.del(stateKey);
|
||||
await pipeline.exec();
|
||||
|
||||
await this.publishPresenceUpdate(roomName, connectionId, "leave");
|
||||
@ -211,6 +232,149 @@ export class PresenceManager {
|
||||
await this.redis.publish(channel, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Publishes a presence state for a connection in a room
|
||||
*
|
||||
* @param connectionId The ID of the connection
|
||||
* @param roomName The name of the room
|
||||
* @param state The state object to publish
|
||||
* @param expireAfter Optional TTL in milliseconds
|
||||
*/
|
||||
async publishPresenceState(
|
||||
connectionId: string,
|
||||
roomName: string,
|
||||
state: Record<string, any>,
|
||||
expireAfter?: number
|
||||
): Promise<void> {
|
||||
const key = this.presenceStateKey(roomName, connectionId);
|
||||
const value = JSON.stringify(state);
|
||||
|
||||
const pipeline = this.redis.pipeline();
|
||||
|
||||
if (expireAfter && expireAfter > 0) {
|
||||
pipeline.set(key, value, "PX", expireAfter);
|
||||
} else {
|
||||
pipeline.set(key, value);
|
||||
}
|
||||
|
||||
await pipeline.exec();
|
||||
await this.publishPresenceStateUpdate(roomName, connectionId, state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the presence state for a connection in a room
|
||||
*
|
||||
* @param connectionId The ID of the connection
|
||||
* @param roomName The name of the room
|
||||
*/
|
||||
async clearPresenceState(
|
||||
connectionId: string,
|
||||
roomName: string
|
||||
): Promise<void> {
|
||||
const key = this.presenceStateKey(roomName, connectionId);
|
||||
await this.redis.del(key);
|
||||
await this.publishPresenceStateUpdate(roomName, connectionId, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current presence state for a connection in a room
|
||||
*
|
||||
* @param connectionId The ID of the connection
|
||||
* @param roomName The name of the room
|
||||
* @returns The presence state or null if not found
|
||||
*/
|
||||
async getPresenceState(
|
||||
connectionId: string,
|
||||
roomName: string
|
||||
): Promise<Record<string, any> | null> {
|
||||
const key = this.presenceStateKey(roomName, connectionId);
|
||||
const value = await this.redis.get(key);
|
||||
|
||||
if (!value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return JSON.parse(value);
|
||||
} catch (e) {
|
||||
console.error(`[PresenceManager] Failed to parse presence state: ${e}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all presence states for a room
|
||||
*
|
||||
* @param roomName The name of the room
|
||||
* @returns A map of connection IDs to their presence states
|
||||
*/
|
||||
async getAllPresenceStates(
|
||||
roomName: string
|
||||
): Promise<Map<string, Record<string, any>>> {
|
||||
const result = new Map<string, Record<string, any>>();
|
||||
const connections = await this.getPresentConnections(roomName);
|
||||
|
||||
if (connections.length === 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
const pipeline = this.redis.pipeline();
|
||||
|
||||
for (const connectionId of connections) {
|
||||
pipeline.get(this.presenceStateKey(roomName, connectionId));
|
||||
}
|
||||
|
||||
const responses = await pipeline.exec();
|
||||
|
||||
if (!responses) {
|
||||
return result;
|
||||
}
|
||||
|
||||
for (let i = 0; i < connections.length; i++) {
|
||||
const connectionId = connections[i];
|
||||
if (!connectionId) continue;
|
||||
|
||||
const [err, value] = responses[i] || [];
|
||||
|
||||
if (err || !value) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const state = JSON.parse(value as string);
|
||||
result.set(connectionId, state);
|
||||
} catch (e) {
|
||||
console.error(`[PresenceManager] Failed to parse presence state: ${e}`);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Publishes a presence state update to Redis
|
||||
*
|
||||
* @param roomName The name of the room
|
||||
* @param connectionId The ID of the connection
|
||||
* @param state The state object or null
|
||||
*/
|
||||
private async publishPresenceStateUpdate(
|
||||
roomName: string,
|
||||
connectionId: string,
|
||||
state: Record<string, any> | null
|
||||
): Promise<void> {
|
||||
const channel = `mesh:presence:updates:${roomName}`;
|
||||
const message = JSON.stringify({
|
||||
type: "state",
|
||||
connectionId,
|
||||
roomName,
|
||||
state,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
await this.redis.publish(channel, message);
|
||||
}
|
||||
|
||||
async cleanupConnection(connection: Connection): Promise<void> {
|
||||
const connectionId = connection.id;
|
||||
const rooms = await this.roomManager.getRoomsForConnection(connectionId);
|
||||
|
||||
@ -79,8 +79,14 @@ export class MeshServer extends WebSocketServer {
|
||||
this.serverOptions.enablePresenceExpirationEvents
|
||||
);
|
||||
if (this.serverOptions.enablePresenceExpirationEvents) {
|
||||
this.redisManager.enableKeyspaceNotifications()
|
||||
.catch(err => this.emit("error", new Error(`Failed to enable keyspace notifications: ${err}`)));
|
||||
this.redisManager
|
||||
.enableKeyspaceNotifications()
|
||||
.catch((err) =>
|
||||
this.emit(
|
||||
"error",
|
||||
new Error(`Failed to enable keyspace notifications: ${err}`)
|
||||
)
|
||||
);
|
||||
}
|
||||
this.commandManager = new CommandManager((err) => this.emit("error", err));
|
||||
this.channelManager = new ChannelManager(
|
||||
@ -647,7 +653,11 @@ export class MeshServer extends WebSocketServer {
|
||||
|
||||
this.exposeCommand<
|
||||
{ roomName: string },
|
||||
{ success: boolean; present: string[] }
|
||||
{
|
||||
success: boolean;
|
||||
present: string[];
|
||||
states?: Record<string, Record<string, any>>;
|
||||
}
|
||||
>("mesh/subscribe-presence", async (ctx) => {
|
||||
const { roomName } = ctx.payload;
|
||||
|
||||
@ -668,7 +678,21 @@ export class MeshServer extends WebSocketServer {
|
||||
roomName
|
||||
);
|
||||
|
||||
return { success: true, present };
|
||||
// get all presence states for the room
|
||||
const statesMap = await this.presenceManager.getAllPresenceStates(
|
||||
roomName
|
||||
);
|
||||
const states: Record<string, Record<string, any>> = {};
|
||||
|
||||
statesMap.forEach((state, connectionId) => {
|
||||
states[connectionId] = state;
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
present,
|
||||
states,
|
||||
};
|
||||
} catch (e) {
|
||||
console.error(
|
||||
`Failed to subscribe to presence for room ${roomName}:`,
|
||||
@ -689,6 +713,72 @@ export class MeshServer extends WebSocketServer {
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
this.exposeCommand<
|
||||
{ roomName: string; state: Record<string, any>; expireAfter?: number },
|
||||
boolean
|
||||
>("mesh/publish-presence-state", async (ctx) => {
|
||||
const { roomName, state, expireAfter } = ctx.payload;
|
||||
const connectionId = ctx.connection.id;
|
||||
|
||||
if (!state) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// ensure presence is tracked for this room and the connection is in the room
|
||||
if (
|
||||
!(await this.presenceManager.isRoomTracked(roomName, ctx.connection)) ||
|
||||
!(await this.isInRoom(roomName, connectionId))
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
await this.presenceManager.publishPresenceState(
|
||||
connectionId,
|
||||
roomName,
|
||||
state,
|
||||
expireAfter
|
||||
);
|
||||
return true;
|
||||
} catch (e) {
|
||||
console.error(
|
||||
`Failed to publish presence state for room ${roomName}:`,
|
||||
e
|
||||
);
|
||||
return false;
|
||||
}
|
||||
});
|
||||
|
||||
this.exposeCommand<{ roomName: string }, boolean>(
|
||||
"mesh/clear-presence-state",
|
||||
async (ctx) => {
|
||||
const { roomName } = ctx.payload;
|
||||
const connectionId = ctx.connection.id;
|
||||
|
||||
// ensure presence is tracked for this room and the connection is in the room
|
||||
if (
|
||||
!(await this.presenceManager.isRoomTracked(
|
||||
roomName,
|
||||
ctx.connection
|
||||
)) ||
|
||||
!(await this.isInRoom(roomName, connectionId))
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
await this.presenceManager.clearPresenceState(connectionId, roomName);
|
||||
return true;
|
||||
} catch (e) {
|
||||
console.error(
|
||||
`Failed to clear presence state for room ${roomName}:`,
|
||||
e
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
388
packages/mesh/src/tests/presence-state.test.ts
Normal file
388
packages/mesh/src/tests/presence-state.test.ts
Normal file
@ -0,0 +1,388 @@
|
||||
import { describe, test, expect, beforeEach, afterEach, vi } from "vitest";
|
||||
import Redis from "ioredis";
|
||||
import { MeshServer } from "../server";
|
||||
import { MeshClient } from "../client";
|
||||
|
||||
const REDIS_HOST = process.env.REDIS_HOST || "127.0.0.1";
|
||||
const REDIS_PORT = process.env.REDIS_PORT
|
||||
? parseInt(process.env.REDIS_PORT, 10)
|
||||
: 6379;
|
||||
|
||||
const createTestServer = (port: number) =>
|
||||
new MeshServer({
|
||||
port,
|
||||
redisOptions: {
|
||||
host: REDIS_HOST,
|
||||
port: REDIS_PORT,
|
||||
},
|
||||
pingInterval: 1000,
|
||||
latencyInterval: 500,
|
||||
enablePresenceExpirationEvents: true,
|
||||
});
|
||||
|
||||
const flushRedis = async () => {
|
||||
const redis = new Redis({ host: REDIS_HOST, port: REDIS_PORT });
|
||||
await redis.flushdb();
|
||||
await redis.quit();
|
||||
};
|
||||
|
||||
const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
describe("Presence State", () => {
|
||||
const port = 8150;
|
||||
let server: MeshServer;
|
||||
let client1: MeshClient;
|
||||
let client2: MeshClient;
|
||||
|
||||
beforeEach(async () => {
|
||||
await flushRedis();
|
||||
|
||||
server = createTestServer(port);
|
||||
server.trackPresence(/^test:room:.*/);
|
||||
await server.ready();
|
||||
|
||||
client1 = new MeshClient(`ws://localhost:${port}`);
|
||||
client2 = new MeshClient(`ws://localhost:${port}`);
|
||||
|
||||
await client1.connect();
|
||||
await client2.connect();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await client1.close();
|
||||
await client2.close();
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("client can publish and receive presence states", async () => {
|
||||
const roomName = "test:room:states";
|
||||
|
||||
const updates: any[] = [];
|
||||
const callback = vi.fn((update: any) => {
|
||||
updates.push(update);
|
||||
});
|
||||
|
||||
const { success, present, states } = await client1.subscribePresence(
|
||||
roomName,
|
||||
callback
|
||||
);
|
||||
expect(success).toBe(true);
|
||||
expect(states).toEqual({});
|
||||
|
||||
await client2.joinRoom(roomName);
|
||||
await wait(100);
|
||||
|
||||
const state = { status: "typing" };
|
||||
const publishSuccess = await client2.publishPresenceState(roomName, {
|
||||
state,
|
||||
});
|
||||
expect(publishSuccess).toBe(true);
|
||||
await wait(100);
|
||||
|
||||
// check that client1 got the state update
|
||||
expect(callback).toHaveBeenCalledTimes(2); // join + state
|
||||
expect(updates[1].type).toBe("state");
|
||||
expect(typeof updates[1].connectionId).toBe("string");
|
||||
expect(updates[1].state).toEqual(state);
|
||||
|
||||
const clearSuccess = await client2.clearPresenceState(roomName);
|
||||
expect(clearSuccess).toBe(true);
|
||||
await wait(100);
|
||||
|
||||
// check that client1 got the state clear
|
||||
expect(callback).toHaveBeenCalledTimes(3);
|
||||
expect(updates[2].type).toBe("state");
|
||||
expect(updates[2].state).toBeNull();
|
||||
});
|
||||
|
||||
test("presence state expires after TTL", async () => {
|
||||
const roomName = "test:room:state-ttl";
|
||||
const shortTTL = 200; // 200ms
|
||||
|
||||
const updates: any[] = [];
|
||||
const callback = vi.fn((update: any) => {
|
||||
updates.push(update);
|
||||
});
|
||||
|
||||
await client1.subscribePresence(roomName, callback);
|
||||
await client2.joinRoom(roomName);
|
||||
await wait(100);
|
||||
|
||||
// publish with short ttl
|
||||
const state = { status: "typing" };
|
||||
await client2.publishPresenceState(roomName, {
|
||||
state,
|
||||
expireAfter: shortTTL,
|
||||
});
|
||||
await wait(100);
|
||||
|
||||
// check that client1 got the update
|
||||
expect(callback).toHaveBeenCalledTimes(2); // join + state
|
||||
expect(updates[1].type).toBe("state");
|
||||
expect(updates[1].state).toEqual(state);
|
||||
|
||||
// wait for ttl to expire
|
||||
await wait(shortTTL + 100);
|
||||
|
||||
// check that client1 got the expiration
|
||||
expect(callback).toHaveBeenCalledTimes(3);
|
||||
expect(updates[2].type).toBe("state");
|
||||
expect(updates[2].state).toBeNull();
|
||||
});
|
||||
|
||||
test("initial presence subscription includes current states", async () => {
|
||||
const roomName = "test:room:initial-states";
|
||||
|
||||
await client1.joinRoom(roomName);
|
||||
const state1 = { status: "online", activity: "coding" };
|
||||
await client1.publishPresenceState(roomName, { state: state1 });
|
||||
|
||||
await client2.joinRoom(roomName);
|
||||
const state2 = { status: "away" };
|
||||
await client2.publishPresenceState(roomName, { state: state2 });
|
||||
|
||||
await wait(100);
|
||||
|
||||
const client3 = new MeshClient(`ws://localhost:${port}`);
|
||||
await client3.connect();
|
||||
|
||||
const callback = vi.fn();
|
||||
const { success, present, states } = await client3.subscribePresence(
|
||||
roomName,
|
||||
callback
|
||||
);
|
||||
|
||||
expect(success).toBe(true);
|
||||
expect(present.length).toBe(2);
|
||||
expect(Object.keys(states || {}).length).toBe(2);
|
||||
|
||||
// make sure states include both clients
|
||||
const connections = server.connectionManager.getLocalConnections();
|
||||
const client1Id = connections[0]?.id!;
|
||||
const client2Id = connections[1]?.id!;
|
||||
|
||||
expect(states?.[client1Id]).toEqual(state1);
|
||||
expect(states?.[client2Id]).toEqual(state2);
|
||||
|
||||
await client3.close();
|
||||
});
|
||||
|
||||
test("presence state is cleared when client leaves room", async () => {
|
||||
const roomName = "test:room:leave-clear";
|
||||
|
||||
const updates: any[] = [];
|
||||
const callback = vi.fn((update: any) => {
|
||||
updates.push(update);
|
||||
});
|
||||
|
||||
await client1.subscribePresence(roomName, callback);
|
||||
await client2.joinRoom(roomName);
|
||||
await wait(100);
|
||||
|
||||
// publish state
|
||||
const state = { status: "typing" };
|
||||
await client2.publishPresenceState(roomName, { state });
|
||||
await wait(100);
|
||||
|
||||
await client2.leaveRoom(roomName);
|
||||
await wait(100);
|
||||
|
||||
// check that client1 got the leave event
|
||||
expect(callback).toHaveBeenCalledTimes(3); // join + state + leave
|
||||
expect(updates[2].type).toBe("leave");
|
||||
|
||||
// client2 rejoins but should have no state
|
||||
await client2.joinRoom(roomName);
|
||||
await wait(100);
|
||||
|
||||
const { states } = await client1.subscribePresence(roomName, () => {});
|
||||
const connections = server.connectionManager.getLocalConnections();
|
||||
const client2Id = connections.find((c) => c.id !== connections[0]?.id)?.id!;
|
||||
expect(states?.[client2Id]).toBeUndefined();
|
||||
});
|
||||
|
||||
test("presence state is cleared when client disconnects", async () => {
|
||||
const roomName = "test:room:disconnect-clear";
|
||||
|
||||
const connections = server.connectionManager.getLocalConnections();
|
||||
const connection2 = connections[1]!;
|
||||
|
||||
await server.addToRoom(roomName, connection2);
|
||||
|
||||
const state = { status: "typing" };
|
||||
await client2.publishPresenceState(roomName, { state });
|
||||
await wait(100);
|
||||
|
||||
// make sure state exists
|
||||
let statesMap = await server.presenceManager.getAllPresenceStates(roomName);
|
||||
expect(statesMap.has(connection2.id)).toBe(true);
|
||||
expect(statesMap.get(connection2.id)).toEqual(state);
|
||||
|
||||
await client2.close();
|
||||
await wait(200);
|
||||
|
||||
// make sure state is gone
|
||||
statesMap = await server.presenceManager.getAllPresenceStates(roomName);
|
||||
expect(statesMap.has(connection2.id)).toBe(false);
|
||||
}, 10000);
|
||||
|
||||
test("double state overwrite (same connection)", async () => {
|
||||
const roomName = "test:room:overwrite";
|
||||
|
||||
const updates: any[] = [];
|
||||
const callback = vi.fn((update: any) => {
|
||||
updates.push(update);
|
||||
});
|
||||
|
||||
// subscribe to presence
|
||||
await client1.subscribePresence(roomName, callback);
|
||||
await client2.joinRoom(roomName);
|
||||
await wait(100);
|
||||
|
||||
const state1 = { status: "typing" };
|
||||
await client2.publishPresenceState(roomName, { state: state1 });
|
||||
await wait(100);
|
||||
|
||||
// publish another state right away
|
||||
const state2 = { status: "idle" };
|
||||
await client2.publishPresenceState(roomName, { state: state2 });
|
||||
await wait(100);
|
||||
|
||||
// check that client1 got both updates
|
||||
expect(callback).toHaveBeenCalledTimes(3); // join + state1 + state2
|
||||
expect(updates[1].type).toBe("state");
|
||||
expect(updates[1].state).toEqual(state1);
|
||||
expect(updates[2].type).toBe("state");
|
||||
expect(updates[2].state).toEqual(state2);
|
||||
|
||||
// make sure only the latest state is kept
|
||||
const statesMap = await server.presenceManager.getAllPresenceStates(
|
||||
roomName
|
||||
);
|
||||
const connections = server.connectionManager.getLocalConnections();
|
||||
const connection2 = connections[1]!;
|
||||
expect(statesMap.get(connection2.id)).toEqual(state2);
|
||||
});
|
||||
|
||||
test("no state if state key is omitted", async () => {
|
||||
const roomName = "test:room:no-state";
|
||||
await client1.joinRoom(roomName);
|
||||
|
||||
// @ts-ignore - Intentionally passing invalid params to test behavior
|
||||
const result = await client1.publishPresenceState(roomName, {});
|
||||
expect(result).toBe(false);
|
||||
|
||||
// make sure no state was stored
|
||||
const statesMap = await server.presenceManager.getAllPresenceStates(
|
||||
roomName
|
||||
);
|
||||
const connections = server.connectionManager.getLocalConnections();
|
||||
const connection1 = connections[0]!;
|
||||
expect(statesMap.has(connection1.id)).toBe(false);
|
||||
});
|
||||
|
||||
test("error on publishing state to non-tracked room", async () => {
|
||||
const roomName = "untracked:room";
|
||||
await client1.joinRoom(roomName);
|
||||
|
||||
const state = { status: "typing" };
|
||||
const result = await client1.publishPresenceState(roomName, { state });
|
||||
|
||||
// should fail - room isn't tracked
|
||||
expect(result).toBe(false);
|
||||
});
|
||||
|
||||
test("multiple TTL states expire independently", async () => {
|
||||
const roomName = "test:room:multiple-ttl";
|
||||
|
||||
const updates: any[] = [];
|
||||
const callback = vi.fn((update: any) => {
|
||||
updates.push(update);
|
||||
});
|
||||
|
||||
await client1.subscribePresence(roomName, callback);
|
||||
await client1.joinRoom(roomName);
|
||||
await client2.joinRoom(roomName);
|
||||
await wait(100);
|
||||
|
||||
const shortTTL = 200; // 200ms
|
||||
const state1 = { status: "typing" };
|
||||
await client2.publishPresenceState(roomName, {
|
||||
state: state1,
|
||||
expireAfter: shortTTL,
|
||||
});
|
||||
|
||||
const longTTL = 500; // 500ms
|
||||
const state2 = { status: "away" };
|
||||
await client1.publishPresenceState(roomName, {
|
||||
state: state2,
|
||||
expireAfter: longTTL,
|
||||
});
|
||||
|
||||
await wait(100);
|
||||
|
||||
// check both states are stored
|
||||
let statesMap = await server.presenceManager.getAllPresenceStates(roomName);
|
||||
const connections = server.connectionManager.getLocalConnections();
|
||||
const connection1 = connections[0]!;
|
||||
const connection2 = connections[1]!;
|
||||
expect(statesMap.get(connection1.id)).toEqual(state2);
|
||||
expect(statesMap.get(connection2.id)).toEqual(state1);
|
||||
|
||||
await wait(shortTTL + 50);
|
||||
|
||||
// check only the short ttl state expired
|
||||
statesMap = await server.presenceManager.getAllPresenceStates(roomName);
|
||||
expect(statesMap.has(connection2.id)).toBe(false);
|
||||
expect(statesMap.get(connection1.id)).toEqual(state2);
|
||||
|
||||
await wait(longTTL - shortTTL);
|
||||
|
||||
// check both states are now expired
|
||||
statesMap = await server.presenceManager.getAllPresenceStates(roomName);
|
||||
expect(statesMap.has(connection1.id)).toBe(false);
|
||||
expect(statesMap.has(connection2.id)).toBe(false);
|
||||
}, 10000);
|
||||
|
||||
test("room-scoped state isolation", async () => {
|
||||
const room1 = "test:room:isolation-1";
|
||||
const room2 = "test:room:isolation-2";
|
||||
|
||||
await client1.joinRoom(room1);
|
||||
await client1.joinRoom(room2);
|
||||
await wait(100);
|
||||
|
||||
const state1 = { status: "typing", room: "room1" };
|
||||
const state2 = { status: "away", room: "room2" };
|
||||
|
||||
await client1.publishPresenceState(room1, { state: state1 });
|
||||
await client1.publishPresenceState(room2, { state: state2 });
|
||||
await wait(100);
|
||||
|
||||
// check states are tracked separately per room
|
||||
const statesMap1 = await server.presenceManager.getAllPresenceStates(room1);
|
||||
const statesMap2 = await server.presenceManager.getAllPresenceStates(room2);
|
||||
|
||||
const connections = server.connectionManager.getLocalConnections();
|
||||
const connection1 = connections[0]!;
|
||||
|
||||
// each room should have the right state
|
||||
expect(statesMap1.get(connection1.id)).toEqual(state1);
|
||||
expect(statesMap2.get(connection1.id)).toEqual(state2);
|
||||
|
||||
// updating state in one room shouldn't affect the other
|
||||
const updatedState1 = { status: "idle", room: "room1-updated" };
|
||||
await client1.publishPresenceState(room1, { state: updatedState1 });
|
||||
await wait(100);
|
||||
|
||||
const updatedStatesMap1 = await server.presenceManager.getAllPresenceStates(
|
||||
room1
|
||||
);
|
||||
const updatedStatesMap2 = await server.presenceManager.getAllPresenceStates(
|
||||
room2
|
||||
);
|
||||
|
||||
expect(updatedStatesMap1.get(connection1.id)).toEqual(updatedState1);
|
||||
expect(updatedStatesMap2.get(connection1.id)).toEqual(state2); // Still has original state
|
||||
});
|
||||
});
|
||||
Loading…
Reference in New Issue
Block a user