feat: make close return a promise and update README

This commit is contained in:
nvms 2025-03-26 16:57:24 -04:00
parent 20fa3707ff
commit 6be7fbbfe0
4 changed files with 114 additions and 89 deletions

View File

@ -2,25 +2,43 @@
[![NPM version](https://img.shields.io/npm/v/@prsm/duplex?color=a1b858&label=)](https://www.npmjs.com/package/@prsm/duplex) [![NPM version](https://img.shields.io/npm/v/@prsm/duplex?color=a1b858&label=)](https://www.npmjs.com/package/@prsm/duplex)
An optionally-secure, full-duplex TCP command server and client on top of `node:tls` and `node:net`. An optionally-secure, full-duplex TCP command server and client built on top of `node:tls` and `node:net`. Provides reliable, Promise-based communication with automatic reconnection and command queueing.
## Features
- **Promise-based API** - All operations return Promises for easy async/await usage
- **Command queueing** - Commands are automatically queued when offline
- **Reliable connections** - Robust error handling and reconnection
- **Secure communication** - Optional TLS encryption
- **Bidirectional communication** - Full-duplex TCP communication
- **Lightweight** - No external dependencies
## Server ## Server
```typescript ```typescript
import { CommandServer } from "@prsm/duplex"; import { CommandServer } from "@prsm/duplex";
import fs from "node:fs";
// An insecure CommandServer (`Server` from `node:net`) // Create a server instance
const server = new CommandServer({ const server = new CommandServer({
host: "localhost", host: "localhost",
port: 3351, port: 3351,
secure: false, secure: false, // For TLS, set to true and provide certificates
}); });
// A secure CommandServer (`Server` from `node:tls`) // Connect the server (returns a Promise)
// https://nodejs.org/api/tls.html#new-tlstlssocketsocket-options await server.connect();
const server = new CommandServer({
// Register command handlers
server.command(0, async (payload, connection) => {
console.log("Received:", payload);
return { status: "success", data: "Command processed" };
});
// For secure connections (TLS)
const secureServer = new CommandServer({
host: "localhost", host: "localhost",
port: 3351, port: 3352,
secure: true, secure: true,
key: fs.readFileSync("certs/server/server.key"), key: fs.readFileSync("certs/server/server.key"),
cert: fs.readFileSync("certs/server/server.crt"), cert: fs.readFileSync("certs/server/server.crt"),
@ -28,55 +46,79 @@ const server = new CommandServer({
requestCert: true, requestCert: true,
}); });
// ------------------- await secureServer.connect();
// Defining a command handler
server.command(0, async (payload: any, connection: Connection) => {
return { ok: "OK" };
});
``` ```
## Client ## Client
```typescript ```typescript
import { CommandClient } from "@prsm/duplex"; import { CommandClient } from "@prsm/duplex";
import fs from "node:fs";
// An insecure client (`Socket` from `node:net`) // Create a client instance
const client = new CommandClient({ const client = new CommandClient({
host: "localhost", host: "localhost",
port: 3351, port: 3351,
secure: false, secure: false, // For TLS, set to true and provide certificates
}); });
// A secure client (`TLSSocket` from `node:tls`) // Connect to the server (returns a Promise)
const client = new CommandClient({ await client.connect();
// Using Promise-based API
try {
const response = await client.command(0, { action: "getData" }, 5000);
console.log("Response:", response.result);
} catch (error) {
console.error("Error:", error);
}
// Using callback API
client.command(0, { action: "getData" }, 5000, (result, error) => {
if (error) {
console.error("Error:", error);
return;
}
console.log("Response:", result);
});
// For secure connections (TLS)
const secureClient = new CommandClient({
host: "localhost", host: "localhost",
port: 3351, port: 3352,
secure: true, secure: true,
key: fs.readFileSync("certs/client/client.key"), key: fs.readFileSync("certs/client/client.key"),
cert: fs.readFileSync("certs/client/client.crt"), cert: fs.readFileSync("certs/client/client.crt"),
ca: fs.readFileSync("certs/ca/ca.crt"), ca: fs.readFileSync("certs/ca/ca.crt"),
}); });
// ------------------- await secureClient.connect();
// Awaiting the response ```
try {
const response = await client.command(0, { some: "payload" }, 1000); ## Error Handling
// command^ ^payload ^expiration
// response: { ok: "OK" }; The library provides detailed error information with error codes:
} catch (error) {
console.error(error); ```typescript
} try {
await client.command(0, payload, 1000);
// ...or receiving the response in a callback } catch (error) {
const callback = (response: any, error: CodeError) => { if (error.code === 'ETIMEOUT') {
if (error) { console.log('Command timed out');
console.error(error.code); } else if (error.code === 'ENOTFOUND') {
return; console.log('Command not found on server');
} } else {
console.error('Other error:', error.message);
// response is { ok: "OK" } }
}; }
```
// Sending a command to the server
client.command(0, { some: "payload" }, 1000, callback); ## Graceful Shutdown
```typescript
// Close client connection
await client.close();
// Close server
await server.close();
``` ```

View File

@ -58,16 +58,18 @@ class TokenClient extends EventEmitter {
}); });
} }
close(callback?: () => void) { close(callback?: () => void): Promise<void> {
if (this.status <= Status.CLOSED) return false; if (this.status <= Status.CLOSED) return Promise.resolve();
this.status = Status.CLOSED; this.status = Status.CLOSED;
this.socket.end(() => {
this.connection = null;
if (callback) callback();
});
return true; return new Promise<void>((resolve) => {
this.socket.end(() => {
this.connection = null;
if (callback) callback();
resolve();
});
});
} }
send(buffer: Buffer) { send(buffer: Buffer) {
@ -83,6 +85,7 @@ class TokenClient extends EventEmitter {
this.hadError = true; this.hadError = true;
// Don't emit ECONNRESET errors during normal disconnection scenarios // Don't emit ECONNRESET errors during normal disconnection scenarios
// @ts-ignore
if (error.code !== "ECONNRESET" || this.status !== Status.CLOSED) { if (error.code !== "ECONNRESET" || this.status !== Status.CLOSED) {
this.emit("error", error); this.emit("error", error);
} }
@ -152,8 +155,8 @@ class QueueClient extends TokenClient {
} }
} }
close() { close(callback?: () => void): Promise<void> {
return super.close(); return super.close(callback);
} }
} }
@ -287,7 +290,7 @@ export class CommandClient extends QueueClient {
}); });
} }
close() { close(callback?: () => void): Promise<void> {
return super.close(); return super.close(callback);
} }
} }

View File

@ -63,18 +63,20 @@ export class TokenServer extends EventEmitter {
}); });
} }
close(callback?: () => void) { close(callback?: () => void): Promise<void> {
if (!this.server.listening) return false; if (!this.server.listening) return Promise.resolve();
this.status = Status.CLOSED; this.status = Status.CLOSED;
this.server.close(() => {
for (const connection of this.connections) {
connection.remoteClose();
}
if (callback) callback();
});
return true; return new Promise<void>((resolve) => {
this.server.close(() => {
for (const connection of this.connections) {
connection.remoteClose();
}
if (callback) callback();
resolve();
});
});
} }
applyListeners() { applyListeners() {

View File

@ -17,18 +17,13 @@ describe("Advanced CommandClient and CommandServer Tests", () => {
}); });
afterEach(async () => { afterEach(async () => {
// Close connections in order
if (client.status === Status.ONLINE) { if (client.status === Status.ONLINE) {
await new Promise<void>((resolve) => { await client.close();
client.once("close", () => resolve());
client.close();
});
} }
if (server.status === Status.ONLINE) { if (server.status === Status.ONLINE) {
await new Promise<void>((resolve) => { await server.close();
server.once("close", () => resolve());
server.close();
});
} }
}); });
@ -40,16 +35,10 @@ describe("Advanced CommandClient and CommandServer Tests", () => {
expect(client.status).toBe(Status.ONLINE); expect(client.status).toBe(Status.ONLINE);
// First close the client gracefully // First close the client gracefully
await new Promise<void>((resolve) => { await client.close();
client.once("close", () => resolve());
client.close();
});
// Then close the server // Then close the server
await new Promise<void>((resolve) => { await server.close();
server.once("close", () => resolve());
server.close();
});
// Restart server // Restart server
await server.connect(); await server.connect();
@ -151,10 +140,7 @@ describe("Advanced CommandClient and CommandServer Tests", () => {
await expect(commandPromise).resolves.toBe("Echo: Queued Message"); await expect(commandPromise).resolves.toBe("Echo: Queued Message");
// Clean up // Clean up
await new Promise<void>((resolve) => { await queuedClient.close();
queuedClient.once("close", () => resolve());
queuedClient.close();
});
}, 3000); }, 3000);
test("multiple concurrent commands are handled correctly", async () => { test("multiple concurrent commands are handled correctly", async () => {
@ -257,15 +243,7 @@ describe("Advanced CommandClient and CommandServer Tests", () => {
}); });
// Clean up // Clean up
await Promise.all( await Promise.all(clients.map((client) => client.close()));
clients.map(
(client) =>
new Promise<void>((resolve) => {
client.once("close", () => resolve());
client.close();
}),
),
);
}, 5000); }, 5000);
test("command returns promise when no callback provided", async () => { test("command returns promise when no callback provided", async () => {