diff --git a/packages/duplex/bun.lockb b/packages/duplex/bun.lockb index e080ae2..30705fa 100755 Binary files a/packages/duplex/bun.lockb and b/packages/duplex/bun.lockb differ diff --git a/packages/duplex/package.json b/packages/duplex/package.json index 755fc52..3060d25 100644 --- a/packages/duplex/package.json +++ b/packages/duplex/package.json @@ -15,13 +15,15 @@ "license": "Apache-2.0", "scripts": { "build": "tsup", - "release": "bumpp package.json && npm publish --access public" + "release": "bumpp package.json && npm publish --access public", + "test": "vitest" }, "type": "module", "devDependencies": { "@types/node": "^22.4.1", "bumpp": "^9.5.1", "tsup": "^8.2.4", - "typescript": "^5.5.4" + "typescript": "^5.5.4", + "vitest": "^2.1.4" } } diff --git a/packages/duplex/src/client/commandclient.ts b/packages/duplex/src/client/commandclient.ts index be385de..27f244e 100644 --- a/packages/duplex/src/client/commandclient.ts +++ b/packages/duplex/src/client/commandclient.ts @@ -9,9 +9,10 @@ import { Status } from "../common/status"; import { IdManager } from "../server/ids"; import { Queue } from "./queue"; -export type TokenClientOptions = tls.ConnectionOptions & net.NetConnectOpts & { - secure: boolean; -}; +export type TokenClientOptions = tls.ConnectionOptions & + net.NetConnectOpts & { + secure: boolean; + }; class TokenClient extends EventEmitter { public options: TokenClientOptions; @@ -23,27 +24,38 @@ class TokenClient extends EventEmitter { constructor(options: TokenClientOptions) { super(); this.options = options; - this.connect(); + this.status = Status.OFFLINE; // Initialize status but don't connect yet } - connect(callback?: () => void) { + connect(callback?: () => void): Promise { if (this.status >= Status.CLOSED) { - return false; + return Promise.resolve(); } - this.hadError = false; - this.status = Status.CONNECTING; + return new Promise((resolve, reject) => { + this.hadError = false; + this.status = Status.CONNECTING; - if (this.options.secure) { - this.socket = tls.connect(this.options, callback); - } else { - this.socket = net.connect(this.options, callback); - } + const onConnect = () => { + if (callback) callback(); + resolve(); + }; - this.connection = null; - this.applyListeners(); + if (this.options.secure) { + this.socket = tls.connect(this.options, onConnect); + } else { + this.socket = net.connect(this.options, onConnect); + } - return true; + this.socket.once("error", (err) => { + if (this.status === Status.CONNECTING) { + reject(err); + } + }); + + this.connection = null; + this.applyListeners(); + }); } close(callback?: () => void) { @@ -69,7 +81,11 @@ class TokenClient extends EventEmitter { private applyListeners() { this.socket.on("error", (error) => { this.hadError = true; - this.emit("error", error); + + // Don't emit ECONNRESET errors during normal disconnection scenarios + if (error.code !== "ECONNRESET" || this.status !== Status.CLOSED) { + this.emit("error", error); + } }); this.socket.on("close", () => { @@ -123,11 +139,17 @@ class QueueClient extends TokenClient { private applyEvents() { this.on("connect", () => { - while (!this.queue.isEmpty) { - const item = this.queue.pop(); + this.processQueue(); + }); + } + + private processQueue() { + while (!this.queue.isEmpty) { + const item = this.queue.pop(); + if (item) { this.sendBuffer(item.value, item.expiresIn); } - }); + } } close() { @@ -136,9 +158,9 @@ class QueueClient extends TokenClient { } export class CommandClient extends QueueClient { - private ids = new IdManager(0xFFFF); + private ids = new IdManager(0xffff); private callbacks: { - [id: number]: (error: Error | null, result?: any) => void + [id: number]: (result: any, error?: Error) => void; } = {}; constructor(options: TokenClientOptions) { @@ -154,9 +176,9 @@ export class CommandClient extends QueueClient { if (this.callbacks[data.id]) { if (data.command === 255) { const error = ErrorSerializer.deserialize(data.payload); - this.callbacks[data.id](error, undefined); + this.callbacks[data.id](undefined, error); } else { - this.callbacks[data.id](null, data.payload); + this.callbacks[data.id](data.payload, null); } } } catch (error) { @@ -165,13 +187,39 @@ export class CommandClient extends QueueClient { }); } - async command(command: number, payload: any, expiresIn: number = 30_000, callback: (result: any, error: CodeError | Error | null) => void | undefined = undefined) { + async command( + command: number, + payload: any, + expiresIn: number = 30_000, + callback: ( + result: any, + error: CodeError | Error | null, + ) => void | undefined = undefined, + ) { if (command === 255) { - throw new CodeError("Command 255 is reserved.", "ERESERVED", "CommandError"); + throw new CodeError( + "Command 255 is reserved.", + "ERESERVED", + "CommandError", + ); + } + + // Ensure we're connected before sending commands + if (this.status < Status.ONLINE) { + try { + await this.connect(); + } catch (err) { + if (typeof callback === "function") { + callback(undefined, err as Error); + return; + } else { + throw err; + } + } } const id = this.ids.reserve(); - const buffer = Command.toBuffer({ id, command, payload }) + const buffer = Command.toBuffer({ id, command, payload }); this.sendBuffer(buffer, expiresIn); @@ -189,10 +237,18 @@ export class CommandClient extends QueueClient { const ret = await Promise.race([response, timeout]); try { - callback(ret, undefined); - } catch (callbackError) { /* */ } - } catch (error) { - callback(undefined, error); + if (ret.error) { + callback(undefined, ret.error); + } else { + callback(ret.result, undefined); + } + // callback(ret, undefined); + } catch (callbackError) { + /* */ + } + } catch (error: unknown) { + const err = error as { result: any; error: any }; + callback(undefined, err.error); } } else { return Promise.race([response, timeout]); @@ -200,27 +256,34 @@ export class CommandClient extends QueueClient { } private createTimeoutPromise(id: number, expiresIn: number) { - return new Promise((resolve, reject) => { + return new Promise<{ error: any; result: any }>((_, reject) => { setTimeout(() => { this.ids.release(id); delete this.callbacks[id]; - reject(new CodeError("Command timed out.", "ETIMEOUT", "CommandError")); + reject({ + error: new CodeError( + "Command timed out.", + "ETIMEOUT", + "CommandError", + ), + result: null, + }); }, expiresIn); }); } private createResponsePromise(id: number) { - return new Promise((resolve, reject) => { - this.callbacks[id] = (error: Error | null, result?: any) => { + return new Promise<{ error: any; result: any }>((resolve, reject) => { + this.callbacks[id] = (result: any, error?: Error) => { this.ids.release(id); delete this.callbacks[id]; if (error) { - reject(error); + reject({ error, result: null }); } else { - resolve(result); + resolve({ result, error: null }); } - } + }; }); } diff --git a/packages/duplex/src/example/client.ts b/packages/duplex/src/example/client.ts index 158dc44..71deba1 100644 --- a/packages/duplex/src/example/client.ts +++ b/packages/duplex/src/example/client.ts @@ -10,18 +10,23 @@ const client = new CommandClient({ const payload = { things: "stuff", numbers: [1, 2, 3] }; async function main() { - const callback = (result: any, error: CodeError) => { - if (error) { - console.log("ERR [0]", error.code); - return; - } + try { + await client.connect(); - console.log("RECV [0]", result); - client.close(); - }; + const callback = (result: any, error: CodeError) => { + if (error) { + console.log("ERR [0]", error.code); + return; + } - client.command(0, payload, 10, callback); + console.log("RECV [0]", result); + client.close(); + }; + client.command(0, payload, 10, callback); + } catch (err) { + console.error("Connection error:", err); + } } main(); diff --git a/packages/duplex/src/example/server.ts b/packages/duplex/src/example/server.ts index d4b5a83..57f35fb 100644 --- a/packages/duplex/src/example/server.ts +++ b/packages/duplex/src/example/server.ts @@ -8,6 +8,11 @@ const server = new CommandServer({ secure: false, }); +server.connect().catch((err) => { + console.error("Failed to start server:", err); + process.exit(1); +}); + server.command(0, async (payload: any, connection: Connection) => { console.log("RECV [0]:", payload); return { ok: "OK" }; diff --git a/packages/duplex/src/server/commandserver.ts b/packages/duplex/src/server/commandserver.ts index 2c89fad..c3cd66e 100644 --- a/packages/duplex/src/server/commandserver.ts +++ b/packages/duplex/src/server/commandserver.ts @@ -7,9 +7,11 @@ import { Connection } from "../common/connection"; import { ErrorSerializer } from "../common/errorserializer"; import { Status } from "../common/status"; -export type TokenServerOptions = tls.TlsOptions & net.ListenOptions & net.SocketConstructorOpts & { - secure?: boolean; -}; +export type TokenServerOptions = tls.TlsOptions & + net.ListenOptions & + net.SocketConstructorOpts & { + secure?: boolean; + }; export class TokenServer extends EventEmitter { connections: Connection[] = []; @@ -24,13 +26,14 @@ export class TokenServer extends EventEmitter { super(); this.options = options; + this.status = Status.OFFLINE; if (this.options.secure) { this.server = tls.createServer(this.options, function (clientSocket) { clientSocket.on("error", (err) => { this.emit("clientError", err); }); - }) + }); } else { this.server = net.createServer(this.options, function (clientSocket) { clientSocket.on("error", (err) => { @@ -40,18 +43,24 @@ export class TokenServer extends EventEmitter { } this.applyListeners(); - this.connect(); + // Don't automatically connect in constructor } - connect(callback?: () => void) { - if (this.status >= Status.CONNECTING) return false; + connect(callback?: () => void): Promise { + if (this.status >= Status.CONNECTING) return Promise.resolve(); this.hadError = false; this.status = Status.CONNECTING; - this.server.listen(this.options, () => { - if (callback) callback(); + + return new Promise((resolve) => { + this.server.listen(this.options, () => { + // Wait a small tick to ensure the server socket is fully bound + setImmediate(() => { + if (callback) callback(); + resolve(); + }); + }); }); - return true; } close(callback?: () => void) { @@ -129,7 +138,7 @@ type CommandFn = (payload: any, connection: Connection) => Promise; export class CommandServer extends TokenServer { private commands: { - [command: number]: CommandFn + [command: number]: CommandFn; } = {}; constructor(options: TokenServerOptions) { @@ -157,10 +166,26 @@ export class CommandServer extends TokenServer { this.commands[command] = fn; } - private async runCommand(id: number, command: number, payload: any, connection: Connection) { + private async runCommand( + id: number, + command: number, + payload: any, + connection: Connection, + ) { try { if (!this.commands[command]) { - throw new CodeError(`Command (${command}) not found.`, "ENOTFOUND", "CommandError"); + connection.send( + Command.toBuffer({ + command: 255, + id, + payload: new CodeError( + `Command (${command}) not found.`, + "ENOTFOUND", + "CommandError", + ), + }), + ); + return; } const result = await this.commands[command](payload, connection); @@ -169,7 +194,9 @@ export class CommandServer extends TokenServer { // we respond with a simple "OK". const payloadResult = result === undefined ? "OK" : result; - connection.send(Command.toBuffer({ command, id, payload: payloadResult })); + connection.send( + Command.toBuffer({ command, id, payload: payloadResult }), + ); } catch (error) { const payload = ErrorSerializer.serialize(error); diff --git a/packages/duplex/src/server/ids.ts b/packages/duplex/src/server/ids.ts index ca1cbae..1d0d252 100644 --- a/packages/duplex/src/server/ids.ts +++ b/packages/duplex/src/server/ids.ts @@ -9,7 +9,9 @@ export class IdManager { release(id: number) { if (id < 0 || id > this.maxIndex) { - throw new TypeError(`ID must be between 0 and ${this.maxIndex}. Got ${id}.`); + throw new TypeError( + `ID must be between 0 and ${this.maxIndex}. Got ${id}.`, + ); } this.ids[id] = false; } @@ -33,7 +35,9 @@ export class IdManager { } if (this.index === startIndex) { - throw new Error(`All IDs are reserved. Make sure to release IDs when they are no longer used.`); + throw new Error( + `All IDs are reserved. Make sure to release IDs when they are no longer used.`, + ); } } } diff --git a/packages/duplex/tests/advanced.test.ts b/packages/duplex/tests/advanced.test.ts new file mode 100644 index 0000000..12b0b3e --- /dev/null +++ b/packages/duplex/tests/advanced.test.ts @@ -0,0 +1,282 @@ +import { describe, test, expect, beforeEach, afterEach } from "vitest"; +import { CommandClient, CommandServer, Status } from "../src/index"; + +describe("Advanced CommandClient and CommandServer Tests", () => { + const serverOptions = { host: "localhost", port: 8125, secure: false }; + const clientOptions = { host: "localhost", port: 8125, secure: false }; + let server: CommandServer; + let client: CommandClient; + + beforeEach(() => { + server = new CommandServer(serverOptions); + server.command(100, async (payload) => { + return `Echo: ${payload}`; + }); + + client = new CommandClient(clientOptions); + }); + + afterEach(async () => { + if (client.status === Status.ONLINE) { + await new Promise((resolve) => { + client.once("close", () => resolve()); + client.close(); + }); + } + + if (server.status === Status.ONLINE) { + await new Promise((resolve) => { + server.once("close", () => resolve()); + server.close(); + }); + } + }); + + test("client reconnects after server restart", async () => { + await server.connect(); + await client.connect(); + + // Verify initial connection + expect(client.status).toBe(Status.ONLINE); + + // First close the client gracefully + await new Promise((resolve) => { + client.once("close", () => resolve()); + client.close(); + }); + + // Then close the server + await new Promise((resolve) => { + server.once("close", () => resolve()); + server.close(); + }); + + // Restart server + await server.connect(); + + // Reconnect client + await client.connect(); + + // Verify reconnection worked + expect(client.status).toBe(Status.ONLINE); + + // Verify functionality after reconnection + return new Promise((resolve, reject) => { + client.command(100, "After Reconnect", 5000, (result, error) => { + try { + expect(error).toBeUndefined(); + expect(result).toBe("Echo: After Reconnect"); + resolve(); + } catch (e) { + reject(e); + } + }); + }); + }, 5000); + + test("command times out when server doesn't respond", async () => { + await server.connect(); + await client.connect(); + + // A command that never responds + server.command(101, async () => { + return new Promise(() => {}); + }); + + // Expect it to fail after a short timeout + await expect( + new Promise((resolve, reject) => { + client.command(101, "Should timeout", 500, (result, error) => { + if (error) { + reject(error); + } else { + resolve(result); + } + }); + }), + ).rejects.toHaveProperty("code", "ETIMEOUT"); + }, 2000); + + test("server errors are properly serialized to client", async () => { + await server.connect(); + await client.connect(); + + server.command(102, async () => { + const error = new Error("Custom server error") as any; + error.code = "ECUSTOM"; + error.name = "CustomError"; + throw error; + }); + + // Expect to receive this error + await expect( + new Promise((resolve, reject) => { + client.command(102, "Will error", 1000, (result, error) => { + if (error) { + reject(error); + } else { + resolve(result); + } + }); + }), + ).rejects.toMatchObject({ + message: "Custom server error", + name: "CustomError", + code: "ECUSTOM", + }); + }, 2000); + + test("commands are queued when client is offline and sent when reconnected", async () => { + // Start with server but no client connection + await server.connect(); + + // Create client but don't connect yet + const queuedClient = new CommandClient(clientOptions); + + // Queue a command while offline + const commandPromise = new Promise((resolve, reject) => { + queuedClient.command(100, "Queued Message", 5000, (result, error) => { + if (error) { + reject(error); + } else { + resolve(result); + } + }); + }); + + // Now connect the client - the queued command should be sent + await queuedClient.connect(); + + // Verify the queued command was processed + await expect(commandPromise).resolves.toBe("Echo: Queued Message"); + + // Clean up + await new Promise((resolve) => { + queuedClient.once("close", () => resolve()); + queuedClient.close(); + }); + }, 3000); + + test("multiple concurrent commands are handled correctly", async () => { + await server.connect(); + await client.connect(); + + // Register commands with different delays + server.command(103, async (payload) => { + await new Promise((r) => setTimeout(r, 50)); + return `Fast: ${payload}`; + }); + + server.command(104, async (payload) => { + await new Promise((r) => setTimeout(r, 150)); + return `Slow: ${payload}`; + }); + + // Send multiple commands concurrently + const results = await Promise.all([ + new Promise((resolve, reject) => { + client.command(103, "First", 1000, (result, error) => { + if (error) reject(error); + else resolve(result); + }); + }), + new Promise((resolve, reject) => { + client.command(104, "Second", 1000, (result, error) => { + if (error) reject(error); + else resolve(result); + }); + }), + new Promise((resolve, reject) => { + client.command(100, "Third", 1000, (result, error) => { + if (error) reject(error); + else resolve(result); + }); + }), + ]); + + // Verify all commands completed successfully + expect(results).toEqual(["Fast: First", "Slow: Second", "Echo: Third"]); + }, 3000); + + test("handles large payloads correctly", async () => { + await server.connect(); + await client.connect(); + + const largeData = { + array: Array(1000) + .fill(0) + .map((_, i) => `item-${i}`), + nested: { + deep: { + object: { + with: "lots of data", + }, + }, + }, + }; + + const result = await new Promise((resolve, reject) => { + client.command(100, largeData, 5000, (result, error) => { + if (error) reject(error); + else resolve(result); + }); + }); + + // Verify the response contains the expected prefix + expect(typeof result).toBe("string"); + expect((result as string).startsWith("Echo: ")).toBe(true); + }, 10000); + + test("server handles multiple client connections", async () => { + await server.connect(); + + // Create multiple clients + const clients = Array(5) + .fill(0) + .map(() => new CommandClient(clientOptions)); + + // Connect all clients + await Promise.all(clients.map((client) => client.connect())); + + // Send a command from each client + const results = await Promise.all( + clients.map( + (client, i) => + new Promise((resolve, reject) => { + client.command(100, `Client ${i}`, 1000, (result, error) => { + if (error) reject(error); + else resolve(result); + }); + }), + ), + ); + + // Verify all commands succeeded + results.forEach((result, i) => { + expect(result).toBe(`Echo: Client ${i}`); + }); + + // Clean up + await Promise.all( + clients.map( + (client) => + new Promise((resolve) => { + client.once("close", () => resolve()); + client.close(); + }), + ), + ); + }, 5000); + + test("command returns promise when no callback provided", async () => { + await server.connect(); + await client.connect(); + + // Use the promise-based API + const result = await client.command(100, "Promise API"); + + // Verify the result + expect(result).toHaveProperty("result", "Echo: Promise API"); + expect(result).toHaveProperty("error", null); + }, 2000); +}); diff --git a/packages/duplex/tests/commandclient.test.ts b/packages/duplex/tests/commandclient.test.ts index a810700..3664718 100644 --- a/packages/duplex/tests/commandclient.test.ts +++ b/packages/duplex/tests/commandclient.test.ts @@ -1,8 +1,6 @@ import { describe, test, expect, beforeEach, afterEach } from "vitest"; import { CommandClient, CommandServer } from "../src/index"; -const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); - describe("CommandClient and CommandServer", () => { const serverOptions = { host: "localhost", port: 8124, secure: false }; const clientOptions = { host: "localhost", port: 8124, secure: false }; @@ -18,100 +16,49 @@ describe("CommandClient and CommandServer", () => { client = new CommandClient(clientOptions); }); - afterEach(() => { - if (client.status === 3) { // ONLINE - client.close(); + afterEach(async () => { + if (client.status === 3) { + // ONLINE + await new Promise((resolve) => { + client.once("close", () => resolve()); + client.close(); + }); } - if (server.status === 3) { // ONLINE - server.close(); + + if (server.status === 3) { + // ONLINE + await new Promise((resolve) => { + server.once("close", () => resolve()); + server.close(); + }); } }); test("client-server connection should be online", async () => { - await new Promise((resolve) => { - server.once("listening", () => { - client.once("connect", () => { - expect(client.status).toBe(3); // ONLINE - resolve(); - }); - }); - server.connect(); - }); - }, 5000); + await server.connect(); + await client.connect(); + expect(client.status).toBe(3); // ONLINE + }, 1000); test("simple echo command", async () => { - await new Promise((resolve) => { - server.once("listening", () => { - client.once("connect", () => { - client.command(100, "Hello", 5000, (result, error) => { + try { + await server.connect(); + + await client.connect(); + + return new Promise((resolve, reject) => { + client.command(100, "Hello", 5000, (result, error) => { + try { expect(error).toBeUndefined(); expect(result).toBe("Echo: Hello"); resolve(); - }); + } catch (e) { + reject(e); + } }); }); - server.connect(); - }); - }, 5000); - - // test("handle unknown command", async () => { - // await sleep(1000); - // await new Promise((resolve) => { - // server.once("listening", () => { - // console.log("Listening! (unknown command)"); - // client.once("connect", () => { - // console.log("Client connected, sending command."); - // client.command(55, "Hello", 1000, (result, error) => { - // console.log("Client callback CALLED! with result", result, "and error", error); - // expect(result).toBeUndefined(); - // // expect(error).toBeDefined(); - // // expect(error.code).toBe("ENOTFOUND"); - // resolve(); - // }); - // }); - // }); - // server.connect(); - // }); - // }, 2000); // Increased timeout - - // test("command should timeout without server response", async () => { - // await new Promise((resolve) => { - // server.once("listening", () => { - // client.once("connect", () => { - // client.command(101, "No response", 1000, (result, error) => { - // expect(result).toBeUndefined(); - // expect(error).toBeInstanceOf(CodeError); - // expect(error.code).toBe("ETIMEOUT"); - // resolve(); - // }); - // }); - // }); - // server.connect(); - // }); - // }, 10000); // Increased timeout - - // test("client should handle server close event", async () => { - // await new Promise((resolve) => { - // let errorEmitted = false; - // client.once("error", () => { - // errorEmitted = true; - // }); - // - // client.once("close", () => { - // expect(errorEmitted).toBe(false); - // expect(client.status).toBe(0); // OFFLINE - // resolve(); - // }); - // - // server.once("listening", () => { - // client.once("connect", () => { - // server.close(() => { - // setTimeout(() => client.close(), 200); - // }); - // }); - // }); - // - // server.connect(); - // }); - // }, 10000); // Increased timeout + } catch (err) { + throw err; + } + }, 1000); });