mirror of
https://github.com/nvms/prsm.git
synced 2025-12-15 15:50:53 +00:00
add useful room APIs to the client
This commit is contained in:
parent
fb4f275d58
commit
57af00dc40
@ -12,6 +12,11 @@ Mesh is a command-based WebSocket framework for real-time applications. It uses
|
||||
* [Server configuration](#server-configuration)
|
||||
* [Server publishing](#server-publishing)
|
||||
* [Client usage](#client-usage)
|
||||
* [Rooms](#rooms)
|
||||
* [Joining a room](#joining-a-room)
|
||||
* [Leaving a room](#leaving-a-room)
|
||||
* [Server API](#server-api)
|
||||
* [Access control](#access-control)
|
||||
* [Presence](#presence)
|
||||
* [Server configuration](#server-configuration-1)
|
||||
* [Getting presence information (server-side)](#getting-presence-information-server-side)
|
||||
@ -228,6 +233,87 @@ This feature is great for:
|
||||
- Pub/sub messaging across distributed server instances
|
||||
- Notification feeds with instant context
|
||||
|
||||
## Rooms
|
||||
|
||||
Mesh supports rooms as a first-class concept for organizing connections into logical groups. Clients can join and leave rooms using simple built-in commands.
|
||||
|
||||
Room membership is automatically tracked across server instances using Redis, and cleaned up when connections disconnect.
|
||||
|
||||
### Joining a room
|
||||
|
||||
Use the `joinRoom` method on the client:
|
||||
|
||||
```ts
|
||||
const { success, present } = await client.joinRoom("room:lobby");
|
||||
```
|
||||
|
||||
By default, this joins the room without subscribing to presence.
|
||||
|
||||
To automatically receive presence updates when users join or leave:
|
||||
|
||||
```ts
|
||||
const { success, present } = await client.joinRoom("room:lobby", (update) => {
|
||||
if (update.type === "join") {
|
||||
console.log("User joined:", update.connectionId);
|
||||
} else {
|
||||
console.log("User left:", update.connectionId);
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
This behaves identically to calling `subscribePresence(...)` yourself, but is more convenient. The `present` array includes the list of currently connected users at the time of join.
|
||||
|
||||
> [!NOTE]
|
||||
> If you don’t pass a callback, the `present` array still reflects who is currently in the room — even though no real-time presence tracking is active.
|
||||
|
||||
### Leaving a room
|
||||
|
||||
Call `leaveRoom(...)` to exit the room:
|
||||
|
||||
```ts
|
||||
await client.leaveRoom("room:lobby");
|
||||
```
|
||||
|
||||
If presence was subscribed via `joinRoom(...)`, it will automatically be unsubscribed when leaving.
|
||||
|
||||
This ensures room and presence subscriptions remain in sync without extra work.
|
||||
|
||||
### Server API
|
||||
|
||||
Mesh also exposes room utilities on the server for custom behavior:
|
||||
|
||||
- `server.addToRoom(roomName, connection)`
|
||||
- `server.removeFromRoom(roomName, connection)`
|
||||
- `server.isInRoom(roomName, connection)` → `boolean`
|
||||
- `server.getRoomMembers(roomName)` → `string[]`
|
||||
- `server.removeFromAllRooms(connection)`
|
||||
- `server.clearRoom(roomName)`
|
||||
|
||||
These can be used to implement custom commands or manage room state manually if needed.
|
||||
|
||||
### Access control
|
||||
|
||||
You can guard room joins using command middleware, just like any other command. The built-in room join command is "mesh/join-room", and the payload contains a `roomName` string:
|
||||
|
||||
```ts
|
||||
server.addMiddleware(async (ctx) => {
|
||||
if (ctx.command === "mesh/join-room") {
|
||||
const { roomName } = ctx.payload;
|
||||
const meta = await server.connectionManager.getMetadata(ctx.connection);
|
||||
|
||||
if (!meta?.canJoinRooms) {
|
||||
throw new Error("Access denied");
|
||||
}
|
||||
|
||||
if (roomName.startsWith("admin:") && !meta.isAdmin) {
|
||||
throw new Error("Admins only");
|
||||
}
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
This gives you full flexibility to enforce auth, roles, or custom logic per room.
|
||||
|
||||
## Presence
|
||||
|
||||
Mesh provides a built-in presence system that tracks which connections are present in specific rooms and notifies clients when connections join or leave. This is ideal for building features like "who's online" indicators, or any real-time awareness of other users.
|
||||
@ -246,7 +332,9 @@ server.trackPresence(/^room:.*$/);
|
||||
// track presence for a specific room
|
||||
server.trackPresence("lobby");
|
||||
|
||||
// guard who can see presence
|
||||
// guard who can see presence.
|
||||
// clients who attempt to subscribe to the presence of this room
|
||||
// will be rejected if the guard returns false
|
||||
server.trackPresence("admin-room", async (conn, roomName) => {
|
||||
const meta = await server.connectionManager.getMetadata(conn);
|
||||
return meta?.isAdmin === true;
|
||||
@ -265,12 +353,12 @@ server.trackPresence("vip-room", {
|
||||
});
|
||||
```
|
||||
|
||||
When presence is enabled for a room, Mesh automatically:
|
||||
When presence tracking is enabled for a room, Mesh automatically:
|
||||
|
||||
1. Tracks which connections are present in the room
|
||||
2. Emits presence events when connections join or leave
|
||||
3. Refreshes each connection's presence TTL automatically as long as it remains connected and responding to pings
|
||||
4. Cleans up presence when connections disconnect
|
||||
- Detects and records the connection IDs of clients joining the room.
|
||||
- Emits real‐time “join” and “leave” events to subscribed clients.
|
||||
- Automatically refreshes each connection’s presence using a configurable TTL as long as the client remains active.
|
||||
- Cleans up expired or disconnected entries to maintain an up-to-date presence list.
|
||||
|
||||
### Getting presence information (server-side)
|
||||
|
||||
|
||||
@ -9,6 +9,14 @@ import type { Operation } from "fast-json-patch";
|
||||
export { Status } from "../common/status";
|
||||
export { applyPatch } from "fast-json-patch";
|
||||
|
||||
export type PresenceUpdate =
|
||||
| { type: "join"; connectionId: string }
|
||||
| { type: "leave"; connectionId: string };
|
||||
|
||||
export type PresenceUpdateCallback = (
|
||||
update: PresenceUpdate
|
||||
) => void | Promise<void>;
|
||||
|
||||
export type MeshClientOptions = Partial<{
|
||||
/**
|
||||
* The number of milliseconds to wait before considering the connection closed due to inactivity.
|
||||
@ -468,20 +476,21 @@ export class MeshClient extends EventEmitter {
|
||||
|
||||
const historyLimit = options?.historyLimit;
|
||||
|
||||
return this.command("mesh/subscribe-channel", { channel, historyLimit }).then(
|
||||
(result) => {
|
||||
if (result.success && result.history && result.history.length > 0) {
|
||||
result.history.forEach((message: string) => {
|
||||
callback(message);
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
success: result.success,
|
||||
history: result.history || [],
|
||||
};
|
||||
return this.command("mesh/subscribe-channel", {
|
||||
channel,
|
||||
historyLimit,
|
||||
}).then((result) => {
|
||||
if (result.success && result.history && result.history.length > 0) {
|
||||
result.history.forEach((message: string) => {
|
||||
callback(message);
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
return {
|
||||
success: result.success,
|
||||
history: result.history || [],
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -515,7 +524,10 @@ export class MeshClient extends EventEmitter {
|
||||
const mode = options?.mode ?? "full";
|
||||
|
||||
try {
|
||||
const result = await this.command("mesh/subscribe-record", { recordId, mode });
|
||||
const result = await this.command("mesh/subscribe-record", {
|
||||
recordId,
|
||||
mode,
|
||||
});
|
||||
|
||||
if (result.success) {
|
||||
this.recordSubscriptions.set(recordId, {
|
||||
@ -553,7 +565,9 @@ export class MeshClient extends EventEmitter {
|
||||
*/
|
||||
async unsubscribeRecord(recordId: string): Promise<boolean> {
|
||||
try {
|
||||
const success = await this.command("mesh/unsubscribe-record", { recordId });
|
||||
const success = await this.command("mesh/unsubscribe-record", {
|
||||
recordId,
|
||||
});
|
||||
if (success) {
|
||||
this.recordSubscriptions.delete(recordId);
|
||||
}
|
||||
@ -590,6 +604,53 @@ export class MeshClient extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to join the specified room and optionally subscribes to presence updates.
|
||||
* If a callback for presence updates is provided, the method subscribes to presence changes and invokes the callback when updates occur.
|
||||
*
|
||||
* @param {string} roomName - The name of the room to join.
|
||||
* @param {PresenceUpdateCallback=} onPresenceUpdate - Optional callback to receive presence updates for the room.
|
||||
* @returns {Promise<{ success: boolean; present: string[] }>} A promise that resolves with an object indicating whether joining was successful and the list of present members.
|
||||
* @throws {Error} If an error occurs during the join or subscription process, the promise may be rejected with the error.
|
||||
*/
|
||||
async joinRoom(
|
||||
roomName: string,
|
||||
onPresenceUpdate?: PresenceUpdateCallback
|
||||
): Promise<{ success: boolean; present: string[] }> {
|
||||
const joinResult = await this.command("mesh/join-room", { roomName });
|
||||
|
||||
if (!joinResult.success) {
|
||||
return { success: false, present: [] };
|
||||
}
|
||||
|
||||
if (!onPresenceUpdate) {
|
||||
return { success: true, present: joinResult.present || [] };
|
||||
}
|
||||
|
||||
const { success: subSuccess, present } = await this.subscribePresence(
|
||||
roomName,
|
||||
onPresenceUpdate
|
||||
);
|
||||
return { success: subSuccess, present };
|
||||
}
|
||||
|
||||
/**
|
||||
* Leaves the specified room and unsubscribes from presence updates if subscribed.
|
||||
*
|
||||
* @param {string} roomName - The name of the room to leave.
|
||||
* @returns {Promise<{ success: boolean }>} A promise that resolves to an object indicating whether leaving the room was successful.
|
||||
* @throws {Error} If the underlying command or unsubscribe operation fails, the promise may be rejected with an error.
|
||||
*/
|
||||
async leaveRoom(roomName: string): Promise<{ success: boolean }> {
|
||||
const result = await this.command("mesh/leave-room", { roomName });
|
||||
|
||||
if (result.success && this.presenceSubscriptions.has(roomName)) {
|
||||
await this.unsubscribePresence(roomName);
|
||||
}
|
||||
|
||||
return { success: result.success };
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribes to presence updates for a specific room.
|
||||
*
|
||||
@ -599,16 +660,12 @@ export class MeshClient extends EventEmitter {
|
||||
*/
|
||||
async subscribePresence(
|
||||
roomName: string,
|
||||
callback: (update: {
|
||||
type: "join" | "leave";
|
||||
connectionId: string;
|
||||
roomName: string;
|
||||
timestamp: number;
|
||||
metadata?: any;
|
||||
}) => void | Promise<void>
|
||||
callback: PresenceUpdateCallback
|
||||
): Promise<{ success: boolean; present: string[] }> {
|
||||
try {
|
||||
const result = await this.command("mesh/subscribe-presence", { roomName });
|
||||
const result = await this.command("mesh/subscribe-presence", {
|
||||
roomName,
|
||||
});
|
||||
|
||||
if (result.success) {
|
||||
this.presenceSubscriptions.set(roomName, callback);
|
||||
@ -635,7 +692,9 @@ export class MeshClient extends EventEmitter {
|
||||
*/
|
||||
async unsubscribePresence(roomName: string): Promise<boolean> {
|
||||
try {
|
||||
const success = await this.command("mesh/unsubscribe-presence", { roomName });
|
||||
const success = await this.command("mesh/unsubscribe-presence", {
|
||||
roomName,
|
||||
});
|
||||
if (success) {
|
||||
this.presenceSubscriptions.delete(roomName);
|
||||
}
|
||||
|
||||
@ -740,6 +740,27 @@ export class MeshServer extends WebSocketServer {
|
||||
return false;
|
||||
}
|
||||
);
|
||||
|
||||
this.registerCommand<
|
||||
{ roomName: string },
|
||||
{ success: boolean; present: string[] }
|
||||
>("mesh/join-room", async (ctx) => {
|
||||
const { roomName } = ctx.payload;
|
||||
await this.addToRoom(roomName, ctx.connection);
|
||||
const present = await this.presenceManager.getPresentConnections(
|
||||
roomName
|
||||
);
|
||||
return { success: true, present };
|
||||
});
|
||||
|
||||
this.registerCommand<{ roomName: string }, { success: boolean }>(
|
||||
"mesh/leave-room",
|
||||
async (ctx) => {
|
||||
const { roomName } = ctx.payload;
|
||||
await this.removeFromRoom(roomName, ctx.connection);
|
||||
return { success: true };
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private registerRecordCommands() {
|
||||
|
||||
@ -270,30 +270,6 @@ describe("Presence Subscription (Multiple Instances)", () => {
|
||||
await serverA.ready();
|
||||
await serverB.ready();
|
||||
|
||||
// register join/leave commands on both servers
|
||||
[serverA, serverB].forEach((server) => {
|
||||
server.registerCommand("join-room", async (ctx) => {
|
||||
const { roomName } = ctx.payload;
|
||||
try {
|
||||
await server.addToRoom(roomName, ctx.connection);
|
||||
return { success: true };
|
||||
} catch (e) {
|
||||
console.error(`[Test Setup] Failed to join room ${roomName}:`, e);
|
||||
return { success: false };
|
||||
}
|
||||
});
|
||||
server.registerCommand("leave-room", async (ctx) => {
|
||||
const { roomName } = ctx.payload;
|
||||
try {
|
||||
await server.removeFromRoom(roomName, ctx.connection);
|
||||
return { success: true };
|
||||
} catch (e) {
|
||||
console.error(`[Test Setup] Failed to leave room ${roomName}:`, e);
|
||||
return { success: false };
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// server a client:
|
||||
clientA = new MeshClient(`ws://localhost:${portA}`);
|
||||
|
||||
@ -325,7 +301,7 @@ describe("Presence Subscription (Multiple Instances)", () => {
|
||||
);
|
||||
expect(initialPresentA).toEqual([]); // empty room
|
||||
|
||||
const joinResultB = await clientB.command("join-room", { roomName });
|
||||
const joinResultB = await clientB.joinRoom(roomName);
|
||||
expect(joinResultB.success).toBe(true);
|
||||
|
||||
await wait(150);
|
||||
@ -356,7 +332,7 @@ describe("Presence Subscription (Multiple Instances)", () => {
|
||||
);
|
||||
expect(initialPresentA).toEqual([]);
|
||||
|
||||
await clientB.command("join-room", { roomName });
|
||||
await clientB.joinRoom(roomName);
|
||||
await wait(150);
|
||||
|
||||
// client a receives join event from client b
|
||||
@ -366,7 +342,7 @@ describe("Presence Subscription (Multiple Instances)", () => {
|
||||
);
|
||||
|
||||
// client B leaves the room via srv b
|
||||
const leaveResultB = await clientB.command("leave-room", { roomName });
|
||||
const leaveResultB = await clientB.leaveRoom(roomName);
|
||||
expect(leaveResultB.success).toBe(true);
|
||||
|
||||
await wait(150);
|
||||
@ -398,7 +374,7 @@ describe("Presence Subscription (Multiple Instances)", () => {
|
||||
);
|
||||
expect(initialPresentA).toEqual([]);
|
||||
|
||||
await clientB.command("join-room", { roomName });
|
||||
await clientB.joinRoom(roomName);
|
||||
await wait(150);
|
||||
|
||||
expect(callbackA).toHaveBeenCalledTimes(1);
|
||||
@ -435,9 +411,9 @@ describe("Presence Subscription (Multiple Instances)", () => {
|
||||
expect(clientCId).toBeDefined();
|
||||
|
||||
// client b -> srv b
|
||||
await clientB.command("join-room", { roomName });
|
||||
await clientB.joinRoom(roomName);
|
||||
// client c -> srv b
|
||||
await clientC.command("join-room", { roomName });
|
||||
await clientC.joinRoom(roomName);
|
||||
|
||||
await wait(150);
|
||||
|
||||
|
||||
@ -47,18 +47,12 @@ describe("MeshServer", () => {
|
||||
});
|
||||
|
||||
test("isInRoom", async () => {
|
||||
server.registerCommand("join-room", async (ctx) => {
|
||||
const { roomName } = ctx.payload;
|
||||
await server.roomManager.addToRoom(roomName, ctx.connection);
|
||||
return { success: true };
|
||||
});
|
||||
|
||||
await clientA.connect();
|
||||
await clientB.connect();
|
||||
|
||||
await clientA.command("join-room", { roomName: "room1" });
|
||||
await clientB.command("join-room", { roomName: "room1" });
|
||||
await clientA.command("join-room", { roomName: "room2" });
|
||||
await clientA.joinRoom("room1");
|
||||
await clientB.joinRoom("room1");
|
||||
await clientA.joinRoom("room2");
|
||||
|
||||
const connectionA = server.connectionManager.getLocalConnections()[0]!;
|
||||
const connectionB = server.connectionManager.getLocalConnections()[1]!;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user