From 6be7fbbfe0799045fff76649d9fffc57872425fe Mon Sep 17 00:00:00 2001 From: nvms Date: Wed, 26 Mar 2025 16:57:24 -0400 Subject: [PATCH] feat: make close return a promise and update README --- packages/duplex/README.md | 120 +++++++++++++------- packages/duplex/src/client/commandclient.ts | 25 ++-- packages/duplex/src/server/commandserver.ts | 20 ++-- packages/duplex/tests/advanced.test.ts | 38 ++----- 4 files changed, 114 insertions(+), 89 deletions(-) diff --git a/packages/duplex/README.md b/packages/duplex/README.md index a31e68c..de7a713 100644 --- a/packages/duplex/README.md +++ b/packages/duplex/README.md @@ -2,25 +2,43 @@ [![NPM version](https://img.shields.io/npm/v/@prsm/duplex?color=a1b858&label=)](https://www.npmjs.com/package/@prsm/duplex) -An optionally-secure, full-duplex TCP command server and client on top of `node:tls` and `node:net`. +An optionally-secure, full-duplex TCP command server and client built on top of `node:tls` and `node:net`. Provides reliable, Promise-based communication with automatic reconnection and command queueing. + +## Features + +- **Promise-based API** - All operations return Promises for easy async/await usage +- **Command queueing** - Commands are automatically queued when offline +- **Reliable connections** - Robust error handling and reconnection +- **Secure communication** - Optional TLS encryption +- **Bidirectional communication** - Full-duplex TCP communication +- **Lightweight** - No external dependencies ## Server ```typescript import { CommandServer } from "@prsm/duplex"; +import fs from "node:fs"; -// An insecure CommandServer (`Server` from `node:net`) +// Create a server instance const server = new CommandServer({ host: "localhost", port: 3351, - secure: false, + secure: false, // For TLS, set to true and provide certificates }); -// A secure CommandServer (`Server` from `node:tls`) -// https://nodejs.org/api/tls.html#new-tlstlssocketsocket-options -const server = new CommandServer({ +// Connect the server (returns a Promise) +await server.connect(); + +// Register command handlers +server.command(0, async (payload, connection) => { + console.log("Received:", payload); + return { status: "success", data: "Command processed" }; +}); + +// For secure connections (TLS) +const secureServer = new CommandServer({ host: "localhost", - port: 3351, + port: 3352, secure: true, key: fs.readFileSync("certs/server/server.key"), cert: fs.readFileSync("certs/server/server.crt"), @@ -28,55 +46,79 @@ const server = new CommandServer({ requestCert: true, }); -// ------------------- -// Defining a command handler -server.command(0, async (payload: any, connection: Connection) => { - return { ok: "OK" }; -}); +await secureServer.connect(); ``` ## Client ```typescript import { CommandClient } from "@prsm/duplex"; +import fs from "node:fs"; -// An insecure client (`Socket` from `node:net`) +// Create a client instance const client = new CommandClient({ host: "localhost", port: 3351, - secure: false, + secure: false, // For TLS, set to true and provide certificates }); -// A secure client (`TLSSocket` from `node:tls`) -const client = new CommandClient({ +// Connect to the server (returns a Promise) +await client.connect(); + +// Using Promise-based API +try { + const response = await client.command(0, { action: "getData" }, 5000); + console.log("Response:", response.result); +} catch (error) { + console.error("Error:", error); +} + +// Using callback API +client.command(0, { action: "getData" }, 5000, (result, error) => { + if (error) { + console.error("Error:", error); + return; + } + console.log("Response:", result); +}); + +// For secure connections (TLS) +const secureClient = new CommandClient({ host: "localhost", - port: 3351, + port: 3352, secure: true, key: fs.readFileSync("certs/client/client.key"), cert: fs.readFileSync("certs/client/client.crt"), ca: fs.readFileSync("certs/ca/ca.crt"), }); -// ------------------- -// Awaiting the response -try { - const response = await client.command(0, { some: "payload" }, 1000); - // command^ ^payload ^expiration - // response: { ok: "OK" }; -} catch (error) { - console.error(error); -} - -// ...or receiving the response in a callback -const callback = (response: any, error: CodeError) => { - if (error) { - console.error(error.code); - return; - } - - // response is { ok: "OK" } -}; - -// Sending a command to the server -client.command(0, { some: "payload" }, 1000, callback); +await secureClient.connect(); +``` + +## Error Handling + +The library provides detailed error information with error codes: + +```typescript +try { + await client.command(0, payload, 1000); +} catch (error) { + if (error.code === 'ETIMEOUT') { + console.log('Command timed out'); + } else if (error.code === 'ENOTFOUND') { + console.log('Command not found on server'); + } else { + console.error('Other error:', error.message); + } +} +``` + +## Graceful Shutdown + +```typescript +// Close client connection +await client.close(); + +// Close server +await server.close(); ``` diff --git a/packages/duplex/src/client/commandclient.ts b/packages/duplex/src/client/commandclient.ts index 27f244e..a111a9b 100644 --- a/packages/duplex/src/client/commandclient.ts +++ b/packages/duplex/src/client/commandclient.ts @@ -58,16 +58,18 @@ class TokenClient extends EventEmitter { }); } - close(callback?: () => void) { - if (this.status <= Status.CLOSED) return false; + close(callback?: () => void): Promise { + if (this.status <= Status.CLOSED) return Promise.resolve(); this.status = Status.CLOSED; - this.socket.end(() => { - this.connection = null; - if (callback) callback(); - }); - return true; + return new Promise((resolve) => { + this.socket.end(() => { + this.connection = null; + if (callback) callback(); + resolve(); + }); + }); } send(buffer: Buffer) { @@ -83,6 +85,7 @@ class TokenClient extends EventEmitter { this.hadError = true; // Don't emit ECONNRESET errors during normal disconnection scenarios + // @ts-ignore if (error.code !== "ECONNRESET" || this.status !== Status.CLOSED) { this.emit("error", error); } @@ -152,8 +155,8 @@ class QueueClient extends TokenClient { } } - close() { - return super.close(); + close(callback?: () => void): Promise { + return super.close(callback); } } @@ -287,7 +290,7 @@ export class CommandClient extends QueueClient { }); } - close() { - return super.close(); + close(callback?: () => void): Promise { + return super.close(callback); } } diff --git a/packages/duplex/src/server/commandserver.ts b/packages/duplex/src/server/commandserver.ts index c3cd66e..4eecd5a 100644 --- a/packages/duplex/src/server/commandserver.ts +++ b/packages/duplex/src/server/commandserver.ts @@ -63,18 +63,20 @@ export class TokenServer extends EventEmitter { }); } - close(callback?: () => void) { - if (!this.server.listening) return false; + close(callback?: () => void): Promise { + if (!this.server.listening) return Promise.resolve(); this.status = Status.CLOSED; - this.server.close(() => { - for (const connection of this.connections) { - connection.remoteClose(); - } - if (callback) callback(); - }); - return true; + return new Promise((resolve) => { + this.server.close(() => { + for (const connection of this.connections) { + connection.remoteClose(); + } + if (callback) callback(); + resolve(); + }); + }); } applyListeners() { diff --git a/packages/duplex/tests/advanced.test.ts b/packages/duplex/tests/advanced.test.ts index 12b0b3e..37bca5b 100644 --- a/packages/duplex/tests/advanced.test.ts +++ b/packages/duplex/tests/advanced.test.ts @@ -17,18 +17,13 @@ describe("Advanced CommandClient and CommandServer Tests", () => { }); afterEach(async () => { + // Close connections in order if (client.status === Status.ONLINE) { - await new Promise((resolve) => { - client.once("close", () => resolve()); - client.close(); - }); + await client.close(); } if (server.status === Status.ONLINE) { - await new Promise((resolve) => { - server.once("close", () => resolve()); - server.close(); - }); + await server.close(); } }); @@ -40,16 +35,10 @@ describe("Advanced CommandClient and CommandServer Tests", () => { expect(client.status).toBe(Status.ONLINE); // First close the client gracefully - await new Promise((resolve) => { - client.once("close", () => resolve()); - client.close(); - }); - + await client.close(); + // Then close the server - await new Promise((resolve) => { - server.once("close", () => resolve()); - server.close(); - }); + await server.close(); // Restart server await server.connect(); @@ -151,10 +140,7 @@ describe("Advanced CommandClient and CommandServer Tests", () => { await expect(commandPromise).resolves.toBe("Echo: Queued Message"); // Clean up - await new Promise((resolve) => { - queuedClient.once("close", () => resolve()); - queuedClient.close(); - }); + await queuedClient.close(); }, 3000); test("multiple concurrent commands are handled correctly", async () => { @@ -257,15 +243,7 @@ describe("Advanced CommandClient and CommandServer Tests", () => { }); // Clean up - await Promise.all( - clients.map( - (client) => - new Promise((resolve) => { - client.once("close", () => resolve()); - client.close(); - }), - ), - ); + await Promise.all(clients.map((client) => client.close())); }, 5000); test("command returns promise when no callback provided", async () => {