fix: improve connection reliability and add comprehensive tests

- Make connect() methods return Promises for better async control
- Remove automatic connections in constructors to prevent race conditions
- Handle ECONNRESET errors gracefully during disconnection
- Add comprehensive test suite covering reconnection, timeouts, and concurrency
This commit is contained in:
nvms 2025-03-26 16:51:03 -04:00
parent 0fa7229471
commit 20fa3707ff
9 changed files with 486 additions and 151 deletions

Binary file not shown.

View File

@ -15,13 +15,15 @@
"license": "Apache-2.0",
"scripts": {
"build": "tsup",
"release": "bumpp package.json && npm publish --access public"
"release": "bumpp package.json && npm publish --access public",
"test": "vitest"
},
"type": "module",
"devDependencies": {
"@types/node": "^22.4.1",
"bumpp": "^9.5.1",
"tsup": "^8.2.4",
"typescript": "^5.5.4"
"typescript": "^5.5.4",
"vitest": "^2.1.4"
}
}

View File

@ -9,7 +9,8 @@ import { Status } from "../common/status";
import { IdManager } from "../server/ids";
import { Queue } from "./queue";
export type TokenClientOptions = tls.ConnectionOptions & net.NetConnectOpts & {
export type TokenClientOptions = tls.ConnectionOptions &
net.NetConnectOpts & {
secure: boolean;
};
@ -23,27 +24,38 @@ class TokenClient extends EventEmitter {
constructor(options: TokenClientOptions) {
super();
this.options = options;
this.connect();
this.status = Status.OFFLINE; // Initialize status but don't connect yet
}
connect(callback?: () => void) {
connect(callback?: () => void): Promise<void> {
if (this.status >= Status.CLOSED) {
return false;
return Promise.resolve();
}
return new Promise<void>((resolve, reject) => {
this.hadError = false;
this.status = Status.CONNECTING;
const onConnect = () => {
if (callback) callback();
resolve();
};
if (this.options.secure) {
this.socket = tls.connect(this.options, callback);
this.socket = tls.connect(this.options, onConnect);
} else {
this.socket = net.connect(this.options, callback);
this.socket = net.connect(this.options, onConnect);
}
this.socket.once("error", (err) => {
if (this.status === Status.CONNECTING) {
reject(err);
}
});
this.connection = null;
this.applyListeners();
return true;
});
}
close(callback?: () => void) {
@ -69,7 +81,11 @@ class TokenClient extends EventEmitter {
private applyListeners() {
this.socket.on("error", (error) => {
this.hadError = true;
// Don't emit ECONNRESET errors during normal disconnection scenarios
if (error.code !== "ECONNRESET" || this.status !== Status.CLOSED) {
this.emit("error", error);
}
});
this.socket.on("close", () => {
@ -123,11 +139,17 @@ class QueueClient extends TokenClient {
private applyEvents() {
this.on("connect", () => {
this.processQueue();
});
}
private processQueue() {
while (!this.queue.isEmpty) {
const item = this.queue.pop();
if (item) {
this.sendBuffer(item.value, item.expiresIn);
}
});
}
}
close() {
@ -136,9 +158,9 @@ class QueueClient extends TokenClient {
}
export class CommandClient extends QueueClient {
private ids = new IdManager(0xFFFF);
private ids = new IdManager(0xffff);
private callbacks: {
[id: number]: (error: Error | null, result?: any) => void
[id: number]: (result: any, error?: Error) => void;
} = {};
constructor(options: TokenClientOptions) {
@ -154,9 +176,9 @@ export class CommandClient extends QueueClient {
if (this.callbacks[data.id]) {
if (data.command === 255) {
const error = ErrorSerializer.deserialize(data.payload);
this.callbacks[data.id](error, undefined);
this.callbacks[data.id](undefined, error);
} else {
this.callbacks[data.id](null, data.payload);
this.callbacks[data.id](data.payload, null);
}
}
} catch (error) {
@ -165,13 +187,39 @@ export class CommandClient extends QueueClient {
});
}
async command(command: number, payload: any, expiresIn: number = 30_000, callback: (result: any, error: CodeError | Error | null) => void | undefined = undefined) {
async command(
command: number,
payload: any,
expiresIn: number = 30_000,
callback: (
result: any,
error: CodeError | Error | null,
) => void | undefined = undefined,
) {
if (command === 255) {
throw new CodeError("Command 255 is reserved.", "ERESERVED", "CommandError");
throw new CodeError(
"Command 255 is reserved.",
"ERESERVED",
"CommandError",
);
}
// Ensure we're connected before sending commands
if (this.status < Status.ONLINE) {
try {
await this.connect();
} catch (err) {
if (typeof callback === "function") {
callback(undefined, err as Error);
return;
} else {
throw err;
}
}
}
const id = this.ids.reserve();
const buffer = Command.toBuffer({ id, command, payload })
const buffer = Command.toBuffer({ id, command, payload });
this.sendBuffer(buffer, expiresIn);
@ -189,10 +237,18 @@ export class CommandClient extends QueueClient {
const ret = await Promise.race([response, timeout]);
try {
callback(ret, undefined);
} catch (callbackError) { /* */ }
} catch (error) {
callback(undefined, error);
if (ret.error) {
callback(undefined, ret.error);
} else {
callback(ret.result, undefined);
}
// callback(ret, undefined);
} catch (callbackError) {
/* */
}
} catch (error: unknown) {
const err = error as { result: any; error: any };
callback(undefined, err.error);
}
} else {
return Promise.race([response, timeout]);
@ -200,27 +256,34 @@ export class CommandClient extends QueueClient {
}
private createTimeoutPromise(id: number, expiresIn: number) {
return new Promise((resolve, reject) => {
return new Promise<{ error: any; result: any }>((_, reject) => {
setTimeout(() => {
this.ids.release(id);
delete this.callbacks[id];
reject(new CodeError("Command timed out.", "ETIMEOUT", "CommandError"));
reject({
error: new CodeError(
"Command timed out.",
"ETIMEOUT",
"CommandError",
),
result: null,
});
}, expiresIn);
});
}
private createResponsePromise(id: number) {
return new Promise((resolve, reject) => {
this.callbacks[id] = (error: Error | null, result?: any) => {
return new Promise<{ error: any; result: any }>((resolve, reject) => {
this.callbacks[id] = (result: any, error?: Error) => {
this.ids.release(id);
delete this.callbacks[id];
if (error) {
reject(error);
reject({ error, result: null });
} else {
resolve(result);
}
resolve({ result, error: null });
}
};
});
}

View File

@ -10,6 +10,9 @@ const client = new CommandClient({
const payload = { things: "stuff", numbers: [1, 2, 3] };
async function main() {
try {
await client.connect();
const callback = (result: any, error: CodeError) => {
if (error) {
console.log("ERR [0]", error.code);
@ -21,7 +24,9 @@ async function main() {
};
client.command(0, payload, 10, callback);
} catch (err) {
console.error("Connection error:", err);
}
}
main();

View File

@ -8,6 +8,11 @@ const server = new CommandServer({
secure: false,
});
server.connect().catch((err) => {
console.error("Failed to start server:", err);
process.exit(1);
});
server.command(0, async (payload: any, connection: Connection) => {
console.log("RECV [0]:", payload);
return { ok: "OK" };

View File

@ -7,7 +7,9 @@ import { Connection } from "../common/connection";
import { ErrorSerializer } from "../common/errorserializer";
import { Status } from "../common/status";
export type TokenServerOptions = tls.TlsOptions & net.ListenOptions & net.SocketConstructorOpts & {
export type TokenServerOptions = tls.TlsOptions &
net.ListenOptions &
net.SocketConstructorOpts & {
secure?: boolean;
};
@ -24,13 +26,14 @@ export class TokenServer extends EventEmitter {
super();
this.options = options;
this.status = Status.OFFLINE;
if (this.options.secure) {
this.server = tls.createServer(this.options, function (clientSocket) {
clientSocket.on("error", (err) => {
this.emit("clientError", err);
});
})
});
} else {
this.server = net.createServer(this.options, function (clientSocket) {
clientSocket.on("error", (err) => {
@ -40,18 +43,24 @@ export class TokenServer extends EventEmitter {
}
this.applyListeners();
this.connect();
// Don't automatically connect in constructor
}
connect(callback?: () => void) {
if (this.status >= Status.CONNECTING) return false;
connect(callback?: () => void): Promise<void> {
if (this.status >= Status.CONNECTING) return Promise.resolve();
this.hadError = false;
this.status = Status.CONNECTING;
return new Promise<void>((resolve) => {
this.server.listen(this.options, () => {
// Wait a small tick to ensure the server socket is fully bound
setImmediate(() => {
if (callback) callback();
resolve();
});
});
});
return true;
}
close(callback?: () => void) {
@ -129,7 +138,7 @@ type CommandFn = (payload: any, connection: Connection) => Promise<any>;
export class CommandServer extends TokenServer {
private commands: {
[command: number]: CommandFn
[command: number]: CommandFn;
} = {};
constructor(options: TokenServerOptions) {
@ -157,10 +166,26 @@ export class CommandServer extends TokenServer {
this.commands[command] = fn;
}
private async runCommand(id: number, command: number, payload: any, connection: Connection) {
private async runCommand(
id: number,
command: number,
payload: any,
connection: Connection,
) {
try {
if (!this.commands[command]) {
throw new CodeError(`Command (${command}) not found.`, "ENOTFOUND", "CommandError");
connection.send(
Command.toBuffer({
command: 255,
id,
payload: new CodeError(
`Command (${command}) not found.`,
"ENOTFOUND",
"CommandError",
),
}),
);
return;
}
const result = await this.commands[command](payload, connection);
@ -169,7 +194,9 @@ export class CommandServer extends TokenServer {
// we respond with a simple "OK".
const payloadResult = result === undefined ? "OK" : result;
connection.send(Command.toBuffer({ command, id, payload: payloadResult }));
connection.send(
Command.toBuffer({ command, id, payload: payloadResult }),
);
} catch (error) {
const payload = ErrorSerializer.serialize(error);

View File

@ -9,7 +9,9 @@ export class IdManager {
release(id: number) {
if (id < 0 || id > this.maxIndex) {
throw new TypeError(`ID must be between 0 and ${this.maxIndex}. Got ${id}.`);
throw new TypeError(
`ID must be between 0 and ${this.maxIndex}. Got ${id}.`,
);
}
this.ids[id] = false;
}
@ -33,7 +35,9 @@ export class IdManager {
}
if (this.index === startIndex) {
throw new Error(`All IDs are reserved. Make sure to release IDs when they are no longer used.`);
throw new Error(
`All IDs are reserved. Make sure to release IDs when they are no longer used.`,
);
}
}
}

View File

@ -0,0 +1,282 @@
import { describe, test, expect, beforeEach, afterEach } from "vitest";
import { CommandClient, CommandServer, Status } from "../src/index";
describe("Advanced CommandClient and CommandServer Tests", () => {
const serverOptions = { host: "localhost", port: 8125, secure: false };
const clientOptions = { host: "localhost", port: 8125, secure: false };
let server: CommandServer;
let client: CommandClient;
beforeEach(() => {
server = new CommandServer(serverOptions);
server.command(100, async (payload) => {
return `Echo: ${payload}`;
});
client = new CommandClient(clientOptions);
});
afterEach(async () => {
if (client.status === Status.ONLINE) {
await new Promise<void>((resolve) => {
client.once("close", () => resolve());
client.close();
});
}
if (server.status === Status.ONLINE) {
await new Promise<void>((resolve) => {
server.once("close", () => resolve());
server.close();
});
}
});
test("client reconnects after server restart", async () => {
await server.connect();
await client.connect();
// Verify initial connection
expect(client.status).toBe(Status.ONLINE);
// First close the client gracefully
await new Promise<void>((resolve) => {
client.once("close", () => resolve());
client.close();
});
// Then close the server
await new Promise<void>((resolve) => {
server.once("close", () => resolve());
server.close();
});
// Restart server
await server.connect();
// Reconnect client
await client.connect();
// Verify reconnection worked
expect(client.status).toBe(Status.ONLINE);
// Verify functionality after reconnection
return new Promise<void>((resolve, reject) => {
client.command(100, "After Reconnect", 5000, (result, error) => {
try {
expect(error).toBeUndefined();
expect(result).toBe("Echo: After Reconnect");
resolve();
} catch (e) {
reject(e);
}
});
});
}, 5000);
test("command times out when server doesn't respond", async () => {
await server.connect();
await client.connect();
// A command that never responds
server.command(101, async () => {
return new Promise(() => {});
});
// Expect it to fail after a short timeout
await expect(
new Promise((resolve, reject) => {
client.command(101, "Should timeout", 500, (result, error) => {
if (error) {
reject(error);
} else {
resolve(result);
}
});
}),
).rejects.toHaveProperty("code", "ETIMEOUT");
}, 2000);
test("server errors are properly serialized to client", async () => {
await server.connect();
await client.connect();
server.command(102, async () => {
const error = new Error("Custom server error") as any;
error.code = "ECUSTOM";
error.name = "CustomError";
throw error;
});
// Expect to receive this error
await expect(
new Promise((resolve, reject) => {
client.command(102, "Will error", 1000, (result, error) => {
if (error) {
reject(error);
} else {
resolve(result);
}
});
}),
).rejects.toMatchObject({
message: "Custom server error",
name: "CustomError",
code: "ECUSTOM",
});
}, 2000);
test("commands are queued when client is offline and sent when reconnected", async () => {
// Start with server but no client connection
await server.connect();
// Create client but don't connect yet
const queuedClient = new CommandClient(clientOptions);
// Queue a command while offline
const commandPromise = new Promise((resolve, reject) => {
queuedClient.command(100, "Queued Message", 5000, (result, error) => {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
// Now connect the client - the queued command should be sent
await queuedClient.connect();
// Verify the queued command was processed
await expect(commandPromise).resolves.toBe("Echo: Queued Message");
// Clean up
await new Promise<void>((resolve) => {
queuedClient.once("close", () => resolve());
queuedClient.close();
});
}, 3000);
test("multiple concurrent commands are handled correctly", async () => {
await server.connect();
await client.connect();
// Register commands with different delays
server.command(103, async (payload) => {
await new Promise((r) => setTimeout(r, 50));
return `Fast: ${payload}`;
});
server.command(104, async (payload) => {
await new Promise((r) => setTimeout(r, 150));
return `Slow: ${payload}`;
});
// Send multiple commands concurrently
const results = await Promise.all([
new Promise((resolve, reject) => {
client.command(103, "First", 1000, (result, error) => {
if (error) reject(error);
else resolve(result);
});
}),
new Promise((resolve, reject) => {
client.command(104, "Second", 1000, (result, error) => {
if (error) reject(error);
else resolve(result);
});
}),
new Promise((resolve, reject) => {
client.command(100, "Third", 1000, (result, error) => {
if (error) reject(error);
else resolve(result);
});
}),
]);
// Verify all commands completed successfully
expect(results).toEqual(["Fast: First", "Slow: Second", "Echo: Third"]);
}, 3000);
test("handles large payloads correctly", async () => {
await server.connect();
await client.connect();
const largeData = {
array: Array(1000)
.fill(0)
.map((_, i) => `item-${i}`),
nested: {
deep: {
object: {
with: "lots of data",
},
},
},
};
const result = await new Promise((resolve, reject) => {
client.command(100, largeData, 5000, (result, error) => {
if (error) reject(error);
else resolve(result);
});
});
// Verify the response contains the expected prefix
expect(typeof result).toBe("string");
expect((result as string).startsWith("Echo: ")).toBe(true);
}, 10000);
test("server handles multiple client connections", async () => {
await server.connect();
// Create multiple clients
const clients = Array(5)
.fill(0)
.map(() => new CommandClient(clientOptions));
// Connect all clients
await Promise.all(clients.map((client) => client.connect()));
// Send a command from each client
const results = await Promise.all(
clients.map(
(client, i) =>
new Promise((resolve, reject) => {
client.command(100, `Client ${i}`, 1000, (result, error) => {
if (error) reject(error);
else resolve(result);
});
}),
),
);
// Verify all commands succeeded
results.forEach((result, i) => {
expect(result).toBe(`Echo: Client ${i}`);
});
// Clean up
await Promise.all(
clients.map(
(client) =>
new Promise<void>((resolve) => {
client.once("close", () => resolve());
client.close();
}),
),
);
}, 5000);
test("command returns promise when no callback provided", async () => {
await server.connect();
await client.connect();
// Use the promise-based API
const result = await client.command(100, "Promise API");
// Verify the result
expect(result).toHaveProperty("result", "Echo: Promise API");
expect(result).toHaveProperty("error", null);
}, 2000);
});

View File

@ -1,8 +1,6 @@
import { describe, test, expect, beforeEach, afterEach } from "vitest";
import { CommandClient, CommandServer } from "../src/index";
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
describe("CommandClient and CommandServer", () => {
const serverOptions = { host: "localhost", port: 8124, secure: false };
const clientOptions = { host: "localhost", port: 8124, secure: false };
@ -18,100 +16,49 @@ describe("CommandClient and CommandServer", () => {
client = new CommandClient(clientOptions);
});
afterEach(() => {
if (client.status === 3) { // ONLINE
afterEach(async () => {
if (client.status === 3) {
// ONLINE
await new Promise<void>((resolve) => {
client.once("close", () => resolve());
client.close();
});
}
if (server.status === 3) { // ONLINE
if (server.status === 3) {
// ONLINE
await new Promise<void>((resolve) => {
server.once("close", () => resolve());
server.close();
});
}
});
test("client-server connection should be online", async () => {
await new Promise<void>((resolve) => {
server.once("listening", () => {
client.once("connect", () => {
await server.connect();
await client.connect();
expect(client.status).toBe(3); // ONLINE
resolve();
});
});
server.connect();
});
}, 5000);
}, 1000);
test("simple echo command", async () => {
await new Promise<void>((resolve) => {
server.once("listening", () => {
client.once("connect", () => {
try {
await server.connect();
await client.connect();
return new Promise<void>((resolve, reject) => {
client.command(100, "Hello", 5000, (result, error) => {
try {
expect(error).toBeUndefined();
expect(result).toBe("Echo: Hello");
resolve();
} catch (e) {
reject(e);
}
});
});
});
server.connect();
});
}, 5000);
// test("handle unknown command", async () => {
// await sleep(1000);
// await new Promise<void>((resolve) => {
// server.once("listening", () => {
// console.log("Listening! (unknown command)");
// client.once("connect", () => {
// console.log("Client connected, sending command.");
// client.command(55, "Hello", 1000, (result, error) => {
// console.log("Client callback CALLED! with result", result, "and error", error);
// expect(result).toBeUndefined();
// // expect(error).toBeDefined();
// // expect(error.code).toBe("ENOTFOUND");
// resolve();
// });
// });
// });
// server.connect();
// });
// }, 2000); // Increased timeout
// test("command should timeout without server response", async () => {
// await new Promise<void>((resolve) => {
// server.once("listening", () => {
// client.once("connect", () => {
// client.command(101, "No response", 1000, (result, error) => {
// expect(result).toBeUndefined();
// expect(error).toBeInstanceOf(CodeError);
// expect(error.code).toBe("ETIMEOUT");
// resolve();
// });
// });
// });
// server.connect();
// });
// }, 10000); // Increased timeout
// test("client should handle server close event", async () => {
// await new Promise<void>((resolve) => {
// let errorEmitted = false;
// client.once("error", () => {
// errorEmitted = true;
// });
//
// client.once("close", () => {
// expect(errorEmitted).toBe(false);
// expect(client.status).toBe(0); // OFFLINE
// resolve();
// });
//
// server.once("listening", () => {
// client.once("connect", () => {
// server.close(() => {
// setTimeout(() => client.close(), 200);
// });
// });
// });
//
// server.connect();
// });
// }, 10000); // Increased timeout
} catch (err) {
throw err;
}
}, 1000);
});