From 58980e9f09be01510c26def8508e6e6337c90f36 Mon Sep 17 00:00:00 2001 From: nvms Date: Mon, 21 Apr 2025 08:37:05 -0400 Subject: [PATCH] fix(presence): add TTL-based expiration cleanup using Redis keyspace notifications --- packages/mesh/README.md | 105 ++++++++++-------- packages/mesh/src/server/managers/presence.ts | 61 +++++++++- packages/mesh/src/server/managers/redis.ts | 22 ++++ packages/mesh/src/server/mesh-server.ts | 8 +- packages/mesh/src/server/types.ts | 8 ++ .../src/tests/presence-subscription.test.ts | 34 ++++++ 6 files changed, 188 insertions(+), 50 deletions(-) diff --git a/packages/mesh/README.md b/packages/mesh/README.md index 7f87cfc..ed03bf3 100644 --- a/packages/mesh/README.md +++ b/packages/mesh/README.md @@ -323,57 +323,42 @@ Mesh provides a built-in presence system that tracks which connections are prese ### Server configuration -Enable presence tracking for specific rooms using exact names or regex patterns: +Enable presence tracking for specific rooms using exact names or regex patterns. You can optionally customize the TTL or restrict access with a guard. ```ts // track presence for all rooms matching a pattern server.trackPresence(/^room:.*$/); -// track presence for a specific room -server.trackPresence("lobby"); - -// guard who can see presence. -// clients who attempt to subscribe to the presence of this room -// will be rejected if the guard returns false -server.trackPresence("admin-room", async (conn, roomName) => { - const meta = await server.connectionManager.getMetadata(conn); - return meta?.isAdmin === true; +// track presence for a specific room with a custom TTL +server.trackPresence("game-room", { + ttl: 60_000, // time in ms before presence entry expires if not refreshed }); -// custom TTL -server.trackPresence("game-room", { ttl: 60_000 }); // ms - -// guard and TTL -server.trackPresence("vip-room", { - ttl: 30_000, +// restrict visibility to admins +server.trackPresence("admin-room", { guard: async (conn, roomName) => { const meta = await server.connectionManager.getMetadata(conn); - return meta?.isVIP === true; + return meta?.isAdmin === true; } }); ``` -When presence tracking is enabled for a room, Mesh automatically: +When presence tracking is enabled: -- Detects and records the connection IDs of clients joining the room. -- Emits real‐time “join” and “leave” events to subscribed clients. -- Automatically refreshes each connection’s presence using a configurable TTL as long as the client remains active. -- Cleans up expired or disconnected entries to maintain an up-to-date presence list. +- Mesh stores connection IDs in Redis with a TTL +- As long as a client remains active, the TTL is automatically refreshed +- When the TTL expires (e.g. due to disconnect or inactivity), Mesh **automatically marks the connection offline** and emits a `leave` event -### Getting presence information (server-side) - -```ts -// get all connections currently present in a room -const connectionIds = await server.presenceManager.getPresentConnections("lobby"); -``` +> [!INFO] +> Under the hood, this uses Redis keyspace notifications to detect expiration events and trigger cleanup. This behavior is enabled by default and can be disabled via the `enablePresenceExpirationEvents` server option. ### Client usage -Subscribe to presence updates for a room: +Subscribe to presence updates: ```ts const { success, present } = await client.subscribePresence( - "lobby", + "room:lobby", (update) => { if (update.type === "join") { console.log("User joined:", update.connectionId); @@ -387,12 +372,38 @@ const { success, present } = await client.subscribePresence( console.log("Currently present:", present); // ["conn1", "conn2", ...] ``` -Unsubscribe when no longer needed: +You'll receive: + +- The current list of `connectionId`s as `present` +- Real-time `"join"` and `"leave"` events as users come and go (or TTL expires) + +Unsubscribe when done: ```ts -await client.unsubscribePresence("lobby"); +await client.unsubscribePresence("room:lobby"); ``` +### Getting presence information (server-side) + +```ts +const ids = await server.presenceManager.getPresentConnections("room:lobby"); +// ["abc123", "def456", ...] +``` + +### Disabling auto-cleanup (optional) + +If for some reason you don't want TTL expirations to trigger `leave` events, you can disable it in your `MeshServer` options: + +```ts +const server = new MeshServer({ + port: 8080, + redisOptions: { host: "localhost", port: 6379 }, + enablePresenceExpirationEvents: false, +}); +``` + +This disables Redis keyspace notifications and requires you to manage stale connections yourself (not recommended). + ### Combining presence with user info Presence is most useful when combined with connection metadata. For example: @@ -407,15 +418,14 @@ server.onConnection(async (connection) => { avatar: "https://example.com/avatar.png" }); }); +``` -// client: subscribe to presence and resolve metadata +Then on the client: + +```ts const { success, present } = await client.subscribePresence( "lobby", async (update) => { - // fetch metadata for the connection that joined/left. - // - // since clients cannot access `getAllMetadataForRoom()` directly (it's just a server API), - // you can expose it via a custom command like `get-user-metadata`: const metadata = await client.command("get-user-metadata", { connectionId: update.connectionId }); @@ -427,16 +437,21 @@ const { success, present } = await client.subscribePresence( } } ); - -// initial presence - fetch metadata for all present connections -const allMetadata = await Promise.all( - present.map(connectionId => - client.command("get-user-metadata", { connectionId }) - ) -); -console.log("Users in lobby:", allMetadata); ``` +To resolve all present users: + +```ts +const allMetadata = await Promise.all( + present.map((connectionId) => client.command("get-user-metadata", { connectionId })) +); + +// [{ userId: "user123", username: "Alice", avatar: "..." }, ...] +``` + +> [!TIP] +> You can expose a `get-user-metadata` command on the server that reads from `connectionManager.getMetadata(...)` to support this. + ### Metadata You can associate data like user IDs, tokens, or custom attributes with a connection using the `setMetadata` method. This metadata is stored in Redis, making it ideal for identifying users, managing permissions, or persisting session-related data across a distributed setup. Since it lives in Redis, it’s accessible from any server instance. diff --git a/packages/mesh/src/server/managers/presence.ts b/packages/mesh/src/server/managers/presence.ts index d72f6e2..3513708 100644 --- a/packages/mesh/src/server/managers/presence.ts +++ b/packages/mesh/src/server/managers/presence.ts @@ -1,12 +1,22 @@ import type { Redis } from "ioredis"; import type { Connection } from "../connection"; import type { RoomManager } from "./room"; +import type { RedisManager } from "./redis"; type ChannelPattern = string | RegExp; export class PresenceManager { private redis: Redis; private roomManager: RoomManager; + private redisManager: RedisManager; + private presenceExpirationEventsEnabled: boolean; + + private getExpiredEventsPattern(): string { + const dbIndex = (this.redis as any).options?.db ?? 0; + return `__keyevent@${dbIndex}__:expired`; + } + + private readonly PRESENCE_KEY_PATTERN = /^mesh:presence:room:(.+):conn:(.+)$/; private trackedRooms: ChannelPattern[] = []; private roomGuards: Map< ChannelPattern, @@ -15,9 +25,51 @@ export class PresenceManager { private roomTTLs: Map = new Map(); private defaultTTL = 30_000; // 30 seconds default TTL - constructor(redis: Redis, roomManager: RoomManager) { + constructor( + redis: Redis, + roomManager: RoomManager, + redisManager: RedisManager, + enableExpirationEvents: boolean = true + ) { this.redis = redis; this.roomManager = roomManager; + this.redisManager = redisManager; + this.presenceExpirationEventsEnabled = enableExpirationEvents; + + if (this.presenceExpirationEventsEnabled) { + this.subscribeToExpirationEvents(); + } + } + + /** + * Subscribes to Redis keyspace notifications for expired presence keys + */ + private subscribeToExpirationEvents(): void { + const { subClient } = this.redisManager; + const pattern = this.getExpiredEventsPattern(); + subClient.psubscribe(pattern); + + subClient.on("pmessage", (pattern, channel, key) => { + if (this.PRESENCE_KEY_PATTERN.test(key)) { + this.handleExpiredKey(key); + } + }); + } + + /** + * Handles an expired key notification + */ + private async handleExpiredKey(key: string): Promise { + try { + const match = key.match(this.PRESENCE_KEY_PATTERN); + if (match && match[1] && match[2]) { + const roomName = match[1]; + const connectionId = match[2]; + await this.markOffline(connectionId, roomName); + } + } catch (err) { + console.error("[PresenceManager] Failed to handle expired key:", err); + } } trackRoom( @@ -113,7 +165,8 @@ export class PresenceManager { const pipeline = this.redis.pipeline(); pipeline.sadd(roomKey, connectionId); - pipeline.set(connKey, "", "EX", Math.floor(ttl / 1000)); + const ttlSeconds = Math.max(1, Math.floor(ttl / 1000)); + pipeline.set(connKey, "", "EX", ttlSeconds); await pipeline.exec(); await this.publishPresenceUpdate(roomName, connectionId, "join"); @@ -134,8 +187,8 @@ export class PresenceManager { async refreshPresence(connectionId: string, roomName: string): Promise { const connKey = this.presenceConnectionKey(roomName, connectionId); const ttl = this.getRoomTTL(roomName); - - await this.redis.set(connKey, "", "EX", Math.floor(ttl / 1000)); + const ttlSeconds = Math.max(1, Math.floor(ttl / 1000)); + await this.redis.set(connKey, "", "EX", ttlSeconds); } async getPresentConnections(roomName: string): Promise { diff --git a/packages/mesh/src/server/managers/redis.ts b/packages/mesh/src/server/managers/redis.ts index cee7915..f29d171 100644 --- a/packages/mesh/src/server/managers/redis.ts +++ b/packages/mesh/src/server/managers/redis.ts @@ -117,4 +117,26 @@ export class RedisManager { set isShuttingDown(value: boolean) { this._isShuttingDown = value; } + + /** + * Enables Redis keyspace notifications for expired events by updating the + * "notify-keyspace-events" configuration. Ensures that both keyevent ('E') + * and expired event ('x') notifications are enabled. If they are not already + * present, the method appends them to the current configuration. + * + * @returns {Promise} A promise that resolves when the configuration has been updated. + * @throws {Error} If the Redis CONFIG commands fail or the connection encounters an error. + */ + async enableKeyspaceNotifications(): Promise { + const result = await this.redis.config("GET", "notify-keyspace-events"); + const currentConfig = + Array.isArray(result) && result.length > 1 ? result[1] : ""; + + // add expired events notification if not already enabled + // 'E' enables keyevent notifications, 'x' enables expired events + let newConfig = currentConfig || ""; + if (!newConfig.includes("E")) newConfig += "E"; + if (!newConfig.includes("x")) newConfig += "x"; + await this.redis.config("SET", "notify-keyspace-events", newConfig); + } } diff --git a/packages/mesh/src/server/mesh-server.ts b/packages/mesh/src/server/mesh-server.ts index 87dfc03..6b94479 100644 --- a/packages/mesh/src/server/mesh-server.ts +++ b/packages/mesh/src/server/mesh-server.ts @@ -74,8 +74,14 @@ export class MeshServer extends WebSocketServer { ); this.presenceManager = new PresenceManager( this.redisManager.redis, - this.roomManager + this.roomManager, + this.redisManager, + this.serverOptions.enablePresenceExpirationEvents ); + if (this.serverOptions.enablePresenceExpirationEvents) { + this.redisManager.enableKeyspaceNotifications() + .catch(err => this.emit("error", new Error(`Failed to enable keyspace notifications: ${err}`))); + } this.commandManager = new CommandManager((err) => this.emit("error", err)); this.channelManager = new ChannelManager( this.redisManager.redis, diff --git a/packages/mesh/src/server/types.ts b/packages/mesh/src/server/types.ts index 589bc1a..cedda6d 100644 --- a/packages/mesh/src/server/types.ts +++ b/packages/mesh/src/server/types.ts @@ -37,6 +37,14 @@ export type MeshServerOptions = ServerOptions & { latencyInterval?: number; redisOptions: RedisOptions; + /** + * Whether to enable Redis keyspace notifications for presence expiration. + * When enabled, connections will be automatically marked as offline when their presence TTL expires. + * + * @default true + */ + enablePresenceExpirationEvents?: boolean; + /** * The maximum number of consecutive ping intervals the server will wait * for a pong response before considering the client disconnected. diff --git a/packages/mesh/src/tests/presence-subscription.test.ts b/packages/mesh/src/tests/presence-subscription.test.ts index d7f5463..6d4e34b 100644 --- a/packages/mesh/src/tests/presence-subscription.test.ts +++ b/packages/mesh/src/tests/presence-subscription.test.ts @@ -17,6 +17,7 @@ const createTestServer = (port: number) => }, pingInterval: 1000, latencyInterval: 500, + enablePresenceExpirationEvents: true, }); const flushRedis = async () => { @@ -243,6 +244,39 @@ describe("Presence Subscription", () => { present = await server.presenceManager.getPresentConnections(roomName); expect(present).not.toContain(connection2.id); }); + + test("presence is automatically cleaned up when TTL expires", async () => { + const roomName = "test:room:auto-cleanup"; + const shortTTL = 1000; + + const testServer = createTestServer(port + 100); + await testServer.ready(); + + testServer.trackPresence(roomName, { ttl: shortTTL }); + + const testClient = new MeshClient(`ws://localhost:${port + 100}`); + await testClient.connect(); + + const connections = testServer.connectionManager.getLocalConnections(); + const connection = connections[0]!; + + await testServer.addToRoom(roomName, connection); + + let present = await testServer.presenceManager.getPresentConnections( + roomName + ); + expect(present).toContain(connection.id); + + // wait for more than the TTL to allow the key to expire and notification to be processed + await wait(shortTTL * 3); + + // the connection should be automatically marked as offline when the key expires + present = await testServer.presenceManager.getPresentConnections(roomName); + expect(present).not.toContain(connection.id); + + await testClient.close(); + await testServer.close(); + }, 10000); }); describe("Presence Subscription (Multiple Instances)", () => {