From 7170d1bf891b2f7003313682ff5f039e8b8310fb Mon Sep 17 00:00:00 2001 From: nvms Date: Tue, 15 Apr 2025 14:33:20 -0400 Subject: [PATCH] redis-backed room support --- packages/keepalive-ws/README.md | 39 +- packages/keepalive-ws/bun.lockb | Bin 110110 -> 113505 bytes packages/keepalive-ws/docker-compose.yml | 16 + packages/keepalive-ws/package.json | 1 + packages/keepalive-ws/src/client/client.ts | 31 +- .../keepalive-ws/src/client/connection.ts | 24 +- packages/keepalive-ws/src/client/ids.ts | 4 +- .../keepalive-ws/src/server/connection.ts | 2 +- packages/keepalive-ws/src/server/index.ts | 145 ++++---- .../keepalive-ws/src/server/room-manager.ts | 192 ++++++++++ packages/keepalive-ws/tests/advanced.test.ts | 34 +- packages/keepalive-ws/tests/basic.test.ts | 20 +- .../keepalive-ws/tests/redis-room.test.ts | 343 ++++++++++++++++++ 13 files changed, 720 insertions(+), 131 deletions(-) create mode 100644 packages/keepalive-ws/docker-compose.yml create mode 100644 packages/keepalive-ws/src/server/room-manager.ts create mode 100644 packages/keepalive-ws/tests/redis-room.test.ts diff --git a/packages/keepalive-ws/README.md b/packages/keepalive-ws/README.md index 13554b7..be5b0d5 100644 --- a/packages/keepalive-ws/README.md +++ b/packages/keepalive-ws/README.md @@ -26,6 +26,9 @@ const server = new KeepAliveServer({ port: 8080, pingInterval: 30000, latencyInterval: 5000, + // Multi-instance room support (optional): + // roomBackend: "redis", + // redisOptions: { host: "localhost", port: 6379 } }); // Register command handlers @@ -41,8 +44,8 @@ server.registerCommand("throws", async () => { // Room-based messaging server.registerCommand("join-room", async (context) => { const { roomName } = context.payload; - server.addToRoom(roomName, context.connection); - server.broadcastRoom(roomName, "user-joined", { + await server.addToRoom(roomName, context.connection); + await server.broadcastRoom(roomName, "user-joined", { id: context.connection.id }); return { success: true }; @@ -101,17 +104,17 @@ await client.close(); ### Room Management ```typescript -// Add a connection to a room -server.addToRoom("roomName", connection); +// Add a connection to a room (async) +await server.addToRoom("roomName", connection); -// Remove a connection from a room -server.removeFromRoom("roomName", connection); +// Remove a connection from a room (async) +await server.removeFromRoom("roomName", connection); -// Get all connections in a room -const roomConnections = server.getRoom("roomName"); +// Get all connections in a room (async) +const roomConnections = await server.getRoom("roomName"); -// Clear all connections from a room -server.clearRoom("roomName"); +// Clear all connections from a room (async) +await server.clearRoom("roomName"); ``` ### Broadcasting @@ -162,6 +165,22 @@ server.registerCommand( ); ``` +## Multi-Instance Room Support + +To enable multi-instance room support (so rooms are shared across all server instances), configure the server with `roomBackend: "redis"` and provide `redisOptions`: + +```typescript +import { KeepAliveServer } from "@prsm/keepalive-ws/server"; + +const server = new KeepAliveServer({ + port: 8080, + roomBackend: "redis", + redisOptions: { host: "localhost", port: 6379 } +}); +``` + +All room management methods become async and must be awaited. + ## Graceful Shutdown ```typescript diff --git a/packages/keepalive-ws/bun.lockb b/packages/keepalive-ws/bun.lockb index 0e583031013d658ad3e25f764522618f8dc59e59..4866361ec460ffd925ea01c7a59912bc56a752ec 100755 GIT binary patch delta 21474 zcmeHvcUY9i_W#bxB8wEeG#e@cN=KF=E2vR$v0z)!SP)TG5s(E9SOBAmCDu`2vBeUN zy~K_Z#f~jv5{*WQCe~P@DH^eTKj$rhrTHVW#Ly`^BbINURuah^t7I9Y@GYj zT@pl%s3``BsL7;U&k7`sZEvVTv+>|;jP1PNG zs=x-695`sHQaOXBCuAn$eX2<4?I1gV+JdH}Wu=ooK-RanlJs+BdcT_VzD!eUcCI-+ zTh&4~OiWHNp`3~;9+QxQmL#QnC5=uw1aHW}?V!}sr*Z|~f|8@{kx6=L8I>PqP8ySx znrTi?GiN56&8kFrO^(HbT7#x0q^6mMs#HnlA-P$ml%cA=C{1120hD^c2b8+Jp{&=+ z@=s`Hedw=)lHF<0dZ7D2sr(vHDmNRH?8eFRP|yaDyUF@6P%7Wj4*gFB>OnyjFi?{3 z!(FQ2D^M~#0!lC13QG9}a{d%qHiJ_6jI@-LtaO#C+(D&k2)PW@1@tJW6X?gXeio=B zE7RP*>0-(8i#hWaH0K0GCD2$cwMlRC5+D8=$)P%3{*rel*$iODKe zuF7OidWV8&Y)dJoy0}XfQcQY{Ov-kU3wU}+j*c>=n$i=@W_#$#@$1kdTJqX>N|VA; z)}Mn+FD?cp2Mb$C_A@~#CX-qd%~j>~gMwz+Qz+1hdG|rd(?&A=#!ITe+gmDFbww5R z4D*p3`C~m}(L?0$U{HF`Szm+%EeCD(X zEz_vd_Qj&MXLHMpa z?`PFbJ+vp^VAaX<&$?8Z_LaKQN!v)>q9+g4y5(-~Dz)$jqcq?*WqJve1_skaoMU?} zr2OXb(m>fCYF~Zq(~Fa;TSQOTJFD&Kw!+O@=cAg`KbhLpuJrTWyPTVT{k&_VxCJ%r zTxa!*>}BV2@#WV#b}=vdAEr!FXEG%md$!s6?ntx)bWpY9{qCHrP)#Y zd(Jp-4(zD8>=@u@t)1POuhV|DV~+JYE87up1=BT818T{cS1Z2FTWiHssvc0Pq0ngu zLW+P?jThI^X_w0qmxG+u@X)$? zO?6AOfEU#X(}aVI5Dh0-^76WR^%YAVT2HTTU7hFD(`yD+SE<@kwq`N7P|@&<>O8c* zUSo+Ph?u4A4z4|XsK)c_>D0@ucsa5!Lm5Q&T6-*w)LqqhWL=%+El6T9b-tFDH_&TN zQ#SdcWtd>WFw~HtI>ee6I_TA7t$DeFUR#7bOl`E9+G&L)L2@Rx1}}8ft5?5hYo5p!!}QO5D8k~3}uLgpX?23NV)o-Fw z8L6y#R1;p*l=BQ?D6^Q<0t*Ifki^HRqwN^x7As zkRr3LP8-rfnhO>@G9uUtLKiVB-$SSVvIP%qt=C556w_M@M=TA?Akl2bVgMIkxbbqB zgy6tLRaU17rCkBZACg*}KNm<6r>=!NjXq5rBRFXS(rQxxiKY?O9>ja8J1_Lss{=fE zIY_Dp5B1S&*Lq<7z(mVg1D*Ba=OHq!Fc!kZ4h~;Q3BEt*w_z)mN;VoKA#9O|}wi zSt(0c6tQGHfYhG9a=>Ed&CC7u+Dvc70a`(~K%!Sx=d5?Il`Kf_ck_|nh!q8EWhNwQ zno3-1_d=pKYI(7TPWx1rEP14}P8;Yey#OPRE==|1p>6c)+rB)fjb0n$Ck-l=GasF1 zA*4<`+&N5r)sKe;>9rmhD%l(OHW^Y3UV?BySDlB1nrRV3=codtal*K^3bukkLx?we z>eM*_yiljtY=yEvFVcl+tOIEp5SIu}(tQf9m#C}NhNcVUrh+qx+$C@cA{UGpmU0(> z8!qZzf=d#)1RYH)DtqWRt}((#Dls0Ml>0R}$%DWUnsg|kodJ#pR3i@BX-GIH$V2an zu#qB%y8dmj)GJcU7eRvaQiN*HK!US*HN^d}9?oerEYd#O3zAezvAa%t7!uWj079UG zqb%w^oi-AZR3%NBTu5Yyn8b!#0+z;Iio*IhK~g#78G~6k&uOREPKJ(ND#giRNa!DW zO+B4Pjf}Ru2+^n4^PC91dXAnKf*jHFa**eG9vZ3FcEK@|xn$MSayI9G`+MD;HcqJH;jQqj*2y_w{+lz zZ|OC+pzO$t-U`!rpi8m*fJ*}xC31VfNx3h6({*@@wjr{c4NfZYB{<2>5#1z}O#mm^ zZT?O73pg|rWjl0Ma%Y2+YXK*fZHOgQsxJYYq+1V8s_z$YQi-r=+9s*KDc~f#v*08< zN37MdFW|)b)N8u%a)Vxb2@0%U6l{p*YB)7hKjFZGg)9V;G@!+p)iZx59fO3PkjGGq zrGO-{M>PW>Vb8|dY8|*>9$r68dtJ5=XO!BnE6?esSLb%+g&<`(i-Xke#zVX7H9axa z+VWQ@GYgy)Cls!SAmKpQDokV79bWQRt-~}O!3B#WJf1kwA6mS2Lh|87-eKC8;LuA5 zSc=1rJ*23_9)JVqL`YP>c;HlT>%q%o_1f!DdP_=dZ4F|1PEWnMODr$!sn_PkN)x&! zk8G{e?t&y2M`b_#CQ;V~^^|InR^2p66;ppRB*|44qto7kB+W^3(7BhCCk;n0SrR)< zvj9>ce-#m?DG@nrkm^^xcwryC)~mPV3N|iGJqYM4d1y96!mOaFhx&hjMAHZ>29|A~ zKGNHyNje-7O&%+85nl~SUZ7C!8YJ@4Qmmy;-%91whQ2&BPOqNOm*>RkwI`5ATLGqE zfKIJ8@^UBx@V5?is@N=bf{~8kS`MWYLM&3JE{A1ny(TygUB!#s!?a_-k>k=CXKx%Y z?5Ed0fRg&IKF@EbQ~SpA(Eg#h)m17LK@DhS)CQ%8C`w0*GqyA!-i{l0=b`LqlY_DvDFHYMh+^?^Ch;|4jwboB#b6%jM+v|E>k~ zkvCUv(f^v}{;yw9Rs9skYorEt6B(Kxo8%T(rZo0j0SdNlGTjbJPgT^CdnGr{rBVj~ zDpdr~LzLuCNx)N?sv#dDC7wfKDp9g828bS(^+d`3s4Nq;1Wp0;-m~-xEEPBh&_k3m zN=d*|nUdWFfavD{Jw(a=B0%~}06j!We}x1*e@+onWOxlAPrm`^sZ6Qj>j2q(2hc;5 z^fyVsQ<>5_^;FdU4T?oveExQUzhlp@nHuKpXD{n(%6g(|zIjAresx4maqiZYbBNON-cXh+Q&KgO^?yal zq_J#A)B&Gh)>r+k}+BckmDvmi~*H%8=kW(c8bIQTlLWN2pQ-}Js@9^`KXNJ#T=iZxf zZTxqW7C2}xY%oP{xcT1Lc_Y{O%qel(5z#y|d2Q5$lk?Vp`n}1fVB!xBVx_O`UxU(e zuRInTCdV3@cUyLR$E9$$qUlrXOlv>(%9zgAT<@mzExVm|`%dlsr+YtW)Y)-BhhOs2 zJo+qgXwz4i)m^<{^k=&-{CHIT>tp^Z)gjlW;5fVUan~y`H79OS&F}L1?k6?#U&Z&Y zyLi&$b9d4m7WfV=AAkOFaom$jQEX^AveQeZ{V1uVQV{FI=*xI;1kREecS8(yM5X9LDHGS zHSp7k-4gFtjEIIqtgq?k!o=SEf+I7EW@}*5z4?3QnbazDaGInrQnffIw zk9NB9;h?9>uNW64@B!%#F%5fLTva*0Ui#LPpH-_o*MDk1LkoQ{d|SPKwil^Enhj-$Q?%8a?Lx2t~C@# zK`lC-`nkUIpaJ(jnV#e3Gj(MAaqaP>@~j&JYB)uotNCz`gWaXuSFN*~zqmBAJ3=K`aif!&s53($r`B8^mRu>+6*!aJ9>(R!R zCm!Wbx9+}c>L@UTd6@`Ee6ce_o6X&YXGG>A8yU}QR; zjWn2-A`Rgl6OAmC=OYc{myx#Rez`_`Z8Z(4p5H*)j)&wKSp=VpG?L##+MY+|8(9Zl zfV3ljg7hsOJ;}&A@s&tB^XEvTch zKBPT(t@n&9mYb0Fi_nU5H1NgM*7=@X({NZ#18^l9q82P$c==m82mcZ{pikpp||G>Z!dBF!5hdH+V z6{I8{J=4hdLfSmjz=reZkdo)x@&U69%*5BvLeS5%<+ig8Y$T7LZR95*6+s%swQ~?< z+>WQt>B7?ZK1lBK5m$2!Y&18`MU+803(3qK=OM}#Ag<;aSQamV6ub~|#SLsM&*q3S zNY@}`bC3CmG68Wl-@wN4%aEcLA+8n}*aSXp0on)YA*5U$vJmYnK>HRNSU$f8DQ+>^ zCm7hfyg)$vmY{u*rts)RXdk4_iwx|2{v1;BQnatYz^3u_1!&(gv~RJ2&EWBi(LP8; zkY;l260~nQ=FJiVo6Yw@a{mzRTWVl)xoIid2k9&%&K;MbeJjwuWd^o@mn<`~g}mu< zBNKQw(nY)!X#w~6(8w0^e56bGWu!~F-wGpJ#-|}&&Tk<7kcX@^vK4$T(v|!k(p5Zi zm65IH1xP>QPmr$R(W{MYEnkWBWBwfJIv)Fxk*(+Jk#68@jgf8S@klrE?MOFs?OL>B zJ=(F>z_#*zklZ(*9UmLmc5eC@?SOO^(hlyp4(-^8cC0h7UAzQR@FtAYdIQ_Tv)5ys zAYFsBk9%yuIBmu_Z7{I?{4%7dEf}Yb23EwUZNxZ1dI;$t57~rq+KO@7WMIYo9;CQ! z7^lq!c7zvf#yD-qI6*qbqqkt3AZ^}aU?=!@u<|JRj-T{4&z3+;6v$UE|Y`mhl@%zu_T!jO;p}i}VJ+hxA(>x!1_P z;{`}>@+U}d@#uXJ+eURJ_VSI`V>S#^;cMz2KK2MIFWX6dTwpKCKwz1L+~8H$3Dp#^)Hu=deM|82|ooUmkZH z19ZfouEzM{BYnBm35*a#4dY#o_T_saZa!*IS7*E&V)99h(J=$l^7Y3sMkN@d;|5lP z#~;TSK`Me|%e5!a##3nH3EGkNL2^HhHlCy%$#fEJgme~C9qw3yHl9HnOK3+bffRfe zZ9GLgQuZmd5z;kCj@;uk+IS9aJWV^&Wk^w5XJLRVdi76)U5#}9DFRW(#VeRU+$}a$3SmPgD^8DM*w#sh+1RNl0@a>n4 zeEb(yj*o1le;P`k*x_IOi~s1~4@<3UoB=$ z6|+)qS~;4#O8-QZJT)hc8k3ZPZ#|&1rM6g)FneVuj7q@=x%|=1Pc(M1__Pxl?Wnlz zaMSRlOcc8VtqmL-M_TJqBk`|?(^aY&z4*l2ZW<+XH5~3CzIDYNE6uaMa$i8(ky|;s zi$4E9mXPn(5gs_F;hEe@XxfB@(cizX-$)Os9|B}RAJx*M$kg4+;r$VKvY|gUI!m>w zevoxkjy`)M^<$Y*xq1LSPXMZmK8tn%#J|6P0z(Dp{7vUtGWZ!F9sOC@SANMavW`B$ z?j!4-$~pwN>TOx~Ox96d7;Dk-UqR^`4+OfZCqNJW7?ZwEvxPw44wI*~P)RLNI{|mY~$wI@EqMmB8aI8`9^{ z8M4kt)=~FO22Yjv$~x+1`Z%9-ezL9s_+tP){?L(w)anzYz!NCj(bw7~q@eC;10{{6 z6F^@mQYG{?3jP;A5I+q{1wv#UjoTSn7b@$vwIXYkYN@X<(UpZp)A-Uy{G9-bktiS< z=mHpk7@#ZA4VVjC4$KD@01E*DSOgRZ#m!mcTvxEo04yo=a~o9)pdru{p#EwEGy%wU zYA=0`PcBoF8UxgSQfkw61a15JZM14#o%f11;`6*GZZz-*=(q3Q>w7Z49n zIQ9l81dYG|fI^SH1fnkr)j&1C0?+`l0DYm<1Lz2}1E_zgKf{2wKscZWbV9A>OlR8~ ztQSBr;UknbV|8+A=xC&Zfck(lP!Dhf=x?vO0DTbP0?^RC26+j*0BDlj2krp3fggbf z0L|`u!1n;nR+`060GfQi0CxeJ?U`vLk8$OO;?qX|Xdl_dc)09%HMVh5%=Uu`@l4SzSTbfq@sQ^eLY39 zG7U%vXqm7Fw7_>TqA$dc0EdA?z(HUuP#gKyz)|otGmir&fRjK8Pz+oKz5>nz7lF@# zQs4qmAE0&2H4c^30TT=K0BCI@IR*MeAb|-J-Ix`#g0$IQqU@Y(s zkPYMjgMoK}iNFNl17I>R1;_^`0n>r0zzkqEFbBYRsNMr+0^KiN0~N*ieD zQkFQ%CtE7l9B2l(0#rv+U?5Pb4YkEJGYU#?z!UHwzkIY_U|R#Ngee}Z8EYrpb7vhq z29N>pHqamF2gCz$fDz~m^Z|MceLR>?_%pEd!Y9CE;1Td8K!djfpy{_A*a~a`mH`4V z3`inB>f~~;^MEuU6)*wGz;GZ17zK;~M#?e;8wKBZARC|{r(ib&6zuN+6zEFOj|ER< zDA*?e6zKUt9x#iBh9dm~;9Xz_FiF_#iE*0-b~-Q@fSqVEQ>HWlW&;a>`2daO0+}uX zT?#A)3VGp^is7}tRCAY_0cKlN`M0ZEgrjo zoxpM65KshA-fmzoum{)&6axE!Pl1EL5ug}23{cslz%k$?@Hub+C+Icmb3HuYi{TP55R&Q@|Cl1h6~c*NR|h3#JvAR_Z6P zwE(RK(BvW=8IuidD?}-uwwcBN#WYpg1fb1Usm~TXZMd}c)C6diryYT|DB2ZBXHR<; zF-k?WYgc{&{UaXQt!XEx9;fJ~9;Y3hcJc-Q?d-J2DULJ;?+%cCOHen!6KDlQ$X{dC z9Si}FwNT{68VlFG*dzz#$MVWA>C|js8}A@*KVh&p>yf(x*0nGtl%K^R*UuaONt3rG za+Dv=$vM6iKciQEVuzeS?*Q*W%8`C(FaHX!3_1R&4P}b)9(&ZL{7z3Sur3Vn+l8{{ZI_Ct=Jw?9?)NX}7y4k(uKr5p;5x+tUkU=TU73su$D*jp^5 zTnM22SWqq$pc1xvGcR>XHKEL#O%1wSP23>zew|tWow_As!$h+XdaLr+_ThJ~?>`;$ zDof1*P;&@;xnE6K>cbpdlwamw?>uc>!|TBtP?kCxWtoL=o-CA01QzZ#thUk}eFh6( z?*LzKKb?bx5aG`p)GaK8M!w7;NVz`XmCqzY?z?WW6>p2Q5Vw-N-J8A&o;<$E(Tbez z7D5Wjx+tF)xVSlOoiMV^W3dfz7C~qstng+2F7KlbEqZWpP4BGO)1epYGEcOFo;eo6 zBYMB`qs_TK{bsuD3J+Ro$TvM(>I9a`i(CO}|!xcFN zw!#CHbx|%)a40YL>v<|DslwujtudyYwo%_ z4LU!ouuyK%IQcZM@%WGW@QR!c_JUmqYFB=d9u#wC_sV%Yb1EzlqU}Qv5z2iL`|H-P zJz#dhBnyT`SVt4Nw7xJ0bp$ED{*PV!=uyOq($^Ih${iFJ{rA6_`J<6l-<4LiauBvqS>-;CdHF8!6UY4G zUSW~$AT2_@%MPx5|L1(Cikw*v!b|c@xoDwbPIq=|{FiwZ7DpWf-!Rm!Tnl3VZK+@D zm#1G=SSS~VOwCx*^4ipB+lrk2uENAHb~mWXIVvs=$~_C+pC8$Ez+%W^%-$gE515nST%EDv6s93Y`u$c~z=rQ*pGXrBu1^WB0N28cVv?%Rt!x?|?QKXyrx_&&6Bj ze3X+iPFxfGz5Q{jbQKzf(*o%#1c$R27v(04CH3}v?=pAV>mPT=1tYHx-wI#i|kgjK* z*85;Vp35bK1A68Zq}=v#$fDjg>r+yt9; zcx5v;X?1q2T|1?BGy5;(vIzA6Hz9&7l*>wbOiVd&b^DG@u)tKH$?(`s$ZUrOj%_Kd zYloG+i@Wf&9rM)J#0k_1PAa#RJU%)hed(n$pQEH77ETmZ?kkBu_(-#~?4dhypr%1< z<0%Y`K(G{JW3)vb%QwusU%%dkDX<8J1=^(CX|gW+by~m*XDmJFa6ekm`?V4_p{&b< zR#KaOEgzSBIBxzLH5&#EW=`JBR>E~yRPDT5oqW1tX^ZV(k(2wJcTX&y3?}{X_OFy)EknKLy&Tf%7dZ(`dn*! zsH5nW_y*;|mQ87XJ5wGdE=LX>k5KD3Uc&20^rUiSOKIGO^U*(lcuUPD(&-2>P=((( zyh?rxG3^oLa(}6lS_^rVWS1)Z3BkZ(bcXo|=U^G6+~acN_1P`%fwqg_7o9KBCh^J_ z^hx%UU0;3O?tnB6Fku2!ICJ#q!0I>hRVlZ@4318$;X17k^_zTdm24bblzU-nIUW-n z&IezFg&dVh{=(!Ah{_CqVJ}fE>JNzSt}is{2+x#Dfp)!l@T0ZUsTQ=@i01)SRg1cm zgQYu_%2u^>OBPiv-BJ#g?gJ=W<?+EtGo zmNB(tXho~4T9~D>We&mzDC<(y@+(<5h6;z?!t%?)q;soZyN_Cof47#CI3E7Abm96M z@z$AhH;d92&HCjV32BlxuIo8b=Mxnb!s1mWuu%mitBs zpG4s};1?;i{$jUIAK#ySBT)29oMWMp3#0L}TW&&GFI1&mf%9ok(-HOS&*#99CL@~K zF;eIT%b*^piWY=1M*oI0$9Fpg3wecq8#%PRtXa7Hfu_meNbx1&WHdzzOQTs#)u&Lg zd#XHzia9uiHtK@+R(T2)EtE^XmOJ|vets{ppLiM)bH1__Qc%{V%2TLlfm7%Ts=dln zsAy5;DOAkCDfAJQt@0EqS}6CIm9DJW@TmK}?iHSmvlH4H;8~TYP_Zmdq4<|ie{u>H zEv{qzuY>-cYOyJKP_58+Dr*19PQdr@4nfNOYdq{hvk@~FPpz=1^7JXz>SHh1#K1G< zPB*Js`c`|{{4*70``QZ;F>IKNa$VE%_x5*K_sVW9mSy}!K}+apgK#(op`2?Fp2i@A ztDF|%Qra~}7}yn_Dt8o(w;Zq}{DU>Aa@=7PsIshh0BGJ-*whto+1^#S-jxkgvoIm5 z8{}hMh4I}WM|InCvl|Oy>hHP>E^kjD_ z4kMN8xvr0x+ZwC;@_u42B> zrU^^Tm%+W+V)Z4X@Di5F>{Q=03ch{NTgtsjQ4e;me*626Bjh<4j8D|?2Xqbir~L7) z{_W56=JCQrl(kNlKLv765LWbIQNry$=(0Uc1*g8Ok?WhLQb?WpNeG`95d;! z{b#y4YdzLF1~|J4{a_cW#tEKY^L3wb9~n0`akPa@i$Z>bK6Y>F29`NOt_k^A$e>>B z#)MvqOJL$>1SN&SCwoGX9!khn6H~Ix znMoORlbn}1B`q^0ZD@iy*?VZxu!O9XOlU;WWKPH!o;8Z@Fcj~A^h(OeNXwA<^n?s^ zQieGb)g@#M^-8cbdnG2MqzHNO%+*uYNva_Ui6h}vg=vM_%U_6H&6+ios+X#SX=TwX zK&UmIHEUeigWpUfhrBRLg#1ygsmH%M4%2^CqE32ki$65G!dB@BFPsF#nrlg)w!uPT zWXA>5Al5->xr#Zr_+yvC^p7epKcN_}_5PRPAX|B4s4a+E44QO7ruQh}$x7C6&v5)B Nqw0G$4PqJH{tr=VA)f#M delta 19268 zcmeHvd3;S*+xA|Y6FG+sC`u2Ig_j$kXkFWceYu&@T*L<%b`-tz-ZU$N#dS6hsa|Pv2`UMr{^k0H< zx?`Yhw@Z`r3zmWLgyK~_Vmv5kFjO-r=mE+Z*mOA%9jcRm^x5=ja)e5D5y8+22elH8KCt*M}P)`#(@Ta z*3he-h z9_f8Z=MFkNZS-JHk%>xkhYbQ{cWr{ka{9VD9XBF%$Os8Nm71M0J_QN3A%gwvWmKLs zWPfflDkU!w8F0ek4YZ1;r)H#PCTC}Nfu1Yw+fYlF1 z#o!ez4`GUtQlNla6*O<8xlAx97uXjDOnn+_8H@tugym;cmd}b%ts?)N&o=ZBS70$H zyQU=!9+3|w3k)mnHPL*o6hp!V^=qopO`z;+Iaw_t(NR)nZbsga(L+;~L(eTM-v|HN z_1NX|QJI2&*Vj}ZL%M%PogBN)?#B77&#jC&X6Sbk->SA)Hw?M&s}Vb z(hTo`a?4|)HF&HRn24sM;4~))8$w^I5k<#;1(^H2u8J6)s zT^RpjLHyJ8(EguW5b3*0VbAZ4(?)2cP7C9;Wg$5uFFiSD$cXuH4IaT{yVhZYbJH`i zd`j6V>0?u};M-ZF(=&6jrRff>!$zg0y2jf7 zY!r+od1s!Q3G34(i5BU}&P|Re?J70Odi(TvP{95n6n5k~Zko z_IIc>?w4~fRH_(1dDrW$k5ZiF)9SBJlUiJ@Q}WTuZ6BQ7e*TTPtpSzSxXL*a71p+y zjw1&<&8!_Gd&*?-w8@Dw*+JIJROo3leS@t(lH5FFOg!q zkxIO5rZyENsWnHN^1!uX&b$>Irba;pDy?NTT{A4qVb}*cXnHTJJlRC{IyTcTC?k=q zwJS4f>}*e~sUG%zHDB`}aLu4M(QFT^=^aRF#mra0Va~AL(<*yaB8!hrPO3z9ADd}W zB}uZ;Oph4Tx8T%T$<18J;%k$~x>8{+o4m%AN2(^3RsC(?2m1x`z`^NH>;xCAvtSFA2tY!Nsuzc0XPX&Yig)bbkz zPSb4#r=@rdPD}ADJW8v<8{jm%^WZeQAgok+UBIdN$?w%9dwrX!7k1db)U|$$X%o0U zD(8x=BvIx1gVSPnf$OR2Dtk#%f0Y{nE>-19!3|Nl7T%JSta5X~X|a#MX({^RJTqL4 zJqRvEF;L}Rdcs`;r={rNW7KLPIL+?2C%T?E7dbht{EjabHnPdre5nM)!;dVD zZRP|&&9#K;S}*xgVPl*8o*$KfNd9CAwaFv=$qr%)phA#g0aOC=WdK>4*v!pwFkw%w zNV6MR<+WeakA=iDq$0bVc^4!s2pkt` zm1_i%rK!#Q5)K)?RHZu-T!e)64pWVjN7d6ZLvjyfrc0Hm)XQqV1c@i5%xyANs}D(? z-|fN0GH0Fxjtj3qobx$IJiQ>n`>Qsf!f>0M*?>yIZRT|iw75#-UfXK^3KDk30yA~5 zW0gZ2@(eT=;Ox^0O0^5kA3$OgtO(&&`I%s{H@BJl2Wv`Ku8}z~M3aywq+yVF?zvER zbi{gHGO5yaNbL|;k-Ga@&Gtr;l%!_O>6Sv`?sQdixvWdDM4Ll!it0dJePZO?##GYM zX5In?*9ykyuA7j!NK^n)!_cQIFa#18X;K@!1roPS;uXn!4-yw?rrDuZbMq#eSgkkWDk-eLeCE8{l*7WJ>9DvkC z&4~NT4eKHP*fG#MZNTzK$>bhpHRnR&QH1j~vC2D}QDG~a=@yiIX=bY!Q|EA=7tAdM zr|JFyuBWO?fQR%~xpm-DRnDV1&rD8{3r>qY1#Y;ii*CX5kz<#D8>n&*pK$$J8Yzmw zX|Z1LeXRy#z&SaE`7k)Wf6+Ww9NrSp1oYvvQ7mY#!@gDoi3dq*`B_LPOY>2)8J&Pq z3M%sentDLOeDjDg&jP3AFuSo;ejG(5?QG^w(V9~0v?-9ZjCiW-g`|4|y4MA1d9*d> z>j{Z7Q{7aa*^2BPY~};dunTDJ@fZ@?gAw4&+Qw*^VcvyBi5RkXw8_U~s1O7jTnR{I zYqE5*$uGC2LJyny97>KwYP1mE?SoTbTSzj`EpuN;Jkpr$sMsP%TopJMI_(@JZUVN$ z##XZh)#2H$&E?j*g!=hdO~sHB)VW^?S$9zTC?1?Xr(iLIC3Q_W0*+gTiQ6v96&Bnt zE;KtLsv?9|Je$mu!Et-l)|<9MisC%wo9(DD&St9Hp0|#faWST0;9^y7J-9fP`}s+1 z6uef`6@cra#vTKwrKr)-$R!D!X14^KX7}9_T?B?IK}}l#PR|9Lp3AdF+ULP(wOaB- z_YF8LMFd7bOEDfC&Pk~KL2z2Rmd=J<5;(ms;MDx&FFR97g3auKWf?02&r0}hJ4oC| z7&(mUSV-D9xdi6gwYO*F*~rtXr9Y6Xey*A zb%cwV)9Qq!>IX?;ycF+3it4Xb)or>kcUm5*MdU&6SXN-e|d8fyCXUP0czzwQ8b0NcSA1 zC+Xx@dXlBDP2Sy;?0s$KioLW&--SwBTIIIAs07MPD7ib-cF9Yi^s<>xLaDh>X-BKs znyB@WdMGgEvP3f*#rSh?j=%Tf15cZNvl9@>fXUBrzeqRK&yNgNs<^c;0oZ7 z#V>{=0n`KdVao9hSiti%tpGU$V0-My>hm<^bfEy-HC1CZY66cX&7vlRdq@&p8sM2< z5ezy|;%F>S>hrHqwnw+94^yrVX1w|^tp=dy)Q2fo$Ek@=(|lFM5jdVonYzLMJ*t)e zzfy2*H0C9}L4Qj1^8ecKuQK{SjL-kmn*9qIdcbCZZpX9|uuPYqrabm50rrtqI$aHl zCtuQ;KTsFMtXJbT%09hOmzi>gTL8}RU4S2^EN^82&(oCCZ&$T{q=Kuw0}+DuCHt#W z^L6$@H|b~?qh)IVSpc|Y=0DB{V{+armX*j1w2nt%@>|hsD`It z#PvVJ)_9(#obg$JGyD|bhbikXuz=@jS_AS;Ra+j_eeur?w3PqJcwR;B>J_@n?!?a% z1$Y3UaP56bH! z7Hd7itVfhb&9LLoXZTSm^&jd_X+s0@y`TxzWBwmdZEXJ&1#=sk=nZ+A@?O>wdX9?J z1>okUA@i67S>!iy}*)CR>%cC6CS&x63xkDkvu%snby`;9<%YPS@psrunSk46;GFrIU2Srzxu@>-zF2M@`Y= znKElMtUSsUnyT67s|>e%x}HIKl%rlk!1QI^4y;;nt~_VRz47m#S#Y5L9~@fL3jKX% zLC^CUNuNvq8;7L7&n(=<%K%rzdS$E|^`GDn?zDD-lcL7C zk=sOvs7ZY$I_Wy3eUNIAd6JXnk9VW=NeOIkorsp_>KOLFk6af^N?-YTQm+z!qlicW1zC#32NC8?m*^Oov zI7EHAP~a2|D153@G^7~_gXt>55Q=`mDH_pigpKJ2!cb~I%_*8t5yCL~0bx^$f6*zL z(K3YL^Z;Q5b)W7O&1o&d79?JBik8#|VI*xqXeIN@P7y`P2&1VOVJmWf#few(sR&!s z0fcSHdxlfkXbi%(bOd2L3M_Pr_LPUP1C=7|NFlE}MJJky@L9TmFqXn!bBfM117RFp zMHo-fGo8Xtvk^Mz2Es1X{&lD5N<|10=m&(|DE`o65_Mq;woT4YKMc9kP zY^O-1J_wU&3qmKE-*k%Jl#H+s6(f9}+{r2WQYylJbO2$0@}7fHn2j-+;}8Ss2qf1x z-Kfc2he)QpxflmXmmm$Hka-vf!Z^%xh!na2$!iXLPH~9gG(&-xLAnhom7?dv%jUx8 z<~zhFx&bM89(=CIA=0U+2wpa?Dm^N)i_sLn0A8lR=N34`785<`C2A0wk}u(7xpk@iNU=j`l&i z4QU2NuR!~jqJ1kI;#Il#$x6&bIQqfAZ57I+OZ&3Uyv~M}ux5^=A(*sCRE6~2x z4neebHQEQs?QMscOMTvUig~mJp+e?0PBEX75f)J~!Ug2M)+rWJD#Ar{0O4Zte#a@6 z&=`bo(Gi48DR7-rETcSx%c&IM3JO{86f0>e!c}wu;c5!s;1qAu41{awD#EoCz0oP& zq1g!6(G7&_sr@FW*g!>_(2n(J$0mo^MDd%^jtywXW{2294*#1A9-)bIK7K; z+U^kV(-BCnTQN>M9O3}w?Z7xex&-MEh3v#QZNoV2bche>0wk~P7^hth@iEQVg>iy( z8`2SqF2*?Rz&I5<#8J8dDR?KwX}3chr=s0Xae{t8_zA`Daf*|)3}GofKzNF}?{$jP zv=-qR68oIuEcHQnjkbjlPawLi3KEePUamWuO+70nOMBfsJ{11snmL$=dk1<3LA4*i` zXcF~4j4?XukRMA_0@3vd2I-hX2+BK#L4tG%QUwY*jzKEHARXr|=>jCLqZp(Uyd}*z zfkA?F8*_|Ow>nzbo{u67B;R|Ib&}lhTPy5E%@p@ZrS0gHzHr2$35nJ0|4XUv9?xj#Q z6SqI+-TvzPGUpv=e2HzdUHc*!SHt*1TU7_w83j=FTdt-yT~JXfy|hBDQ}U3}SsAc( zB+!Xl339z|BJIF?U?{1`Gwjsy}d7axd}t@_1GQxchm?xJ%hT zxc9h0{IZeXP`&_616~BW0dYVAz)lzsu-|n7dI0Qi{J!@Wr2iGT4?F;Vllp2ejQQp9 zvjD$jZUJz&axXRm!hr~&Il%wIdH}V6+5kHo`VO5P z3RDC5a{xEM9k_>j`~>^}+y!`^Tme1@J_Ei5t^!{Imw_(;p0GTTd6M!3wvYuGGIBd z0vG}G0+N7SU>Yz9m;$^DYypO#0G^aQ35NnH0Iv!afk%k{6gUO!;@6)$!E6V%0V{z@ zFnSDH4D1H>0DFObz90ooDJ_am6SLD$NG!}RkXb&Vq*BeLzoRE71 zeF70Y55U``en5X<05A|31SCt!>rF)+WqniOlb;C1cpw|d0;U3azyx3{Fb+V2q{%=5 z@FFlB$N?q-F94YU*NoS(OkfzW0dg+r1Rw{<19({)2TTO`bBa+yvZ|jm@Mj4hAi*Kv zAn-o0A2^?AxfCIqia4cfga~SL2O2K^ zI`A!U4LAnyaPZVx3#$J$_icD8PXpMeUjW#@^8wbg zj{TVF6u|KFsmkrJzzbs4%i3m1hxSjw*lA$Yy>s~TYz_g?Z6ITH?RvR1~}~= zU@!1KPy!qQ4g=hS9|Io&AF_LW0Oka0B=|iKof!c0Jr%U;2!WJkIN5Weg=L5eg$}^ zGj_d)ka-8-o#1!y0#pLnJO2Uv2Ji%99ow?aW1VvRBY<~O_GHeJcho9?kzYmr9GLe| z-amQK=Dm}*78Ag$J?ji=WW;+b7i1K`TQ+Y={P{ZfIQuO3IPc56AM?J1kwpKAs`P?cJl(=XSDzA~0Ds4q8WplJh zkatP*S_v%ATWL9_GrTd04ZsXlP3$rCl50C7+d)1X#*fkecP2l9JgP z^jAq)#5wTZRuwsXbJy`@LP%n2Sq?RXauODi0lLMG^`Az)I4R(RG7CP?R7ct#%S(3B zsQA%kF*c$2#h{e#LWznI5%Mshq(L6Z+lK`yTpS<2wf^Gw6Fr{PS3yi=#LRB`{LBX1 z+eCW^-cJ0WSq@qS^iId*^R;r`U*;hOvt(s^6UrSf zUvIS%&|1`ySIA21)~Jl$1wMR;tA*qj9yR{y`&>JfRewnZWhGncouu65uo`@XyKu^`-0Z_qXCu&s;b9TsVa=px z%*u&&BErYGR+l{^?$NeY^(LuBppzwOsaf%94@WU>sb0SP{^E5J`=Tp~W?>O6+2Wu@ zc@7ql#=X`awv&y0eBIwJvoLPHdj0lWc#o3El(HD9va*EJ8h2%zbyzTPr%N@t%)+=$ z>*m?%#*&R=KQ4>0RaS0s&5ZlEsl~0=v~WrPrp#h!WhI~kayM@5E?+xk=gr^+mh zJH6{-Q~PJNZMdi`W?5w=gVXBQfD0Pc2yQN3{c2g6g>g|hZP4<^=VlkTFN^uIva%Ox zeTE?%$PJe1(`G zc=P=MMN?WVDtTNM<5Ekx%xUXkTH|O`(ErGoXFJ&^W4w*J8&{*P4L3wZ{NrS7S4+})afiDJd8`fU*FnYXMe!MQdxZHuMFxe0(^`c+Km=$oV9X7+C+7lM}$S- zzdwIv5$Ex{zp|sV=;E_DKs}!pEUdNjyE@bNj)G;gu;y&JEkN;$!{WFvKO)$Bxo zvNR4&n;WRD;H!QaKeT7nkKEL}o3XZ3-3N5UAXV6OAmt zz=9X3xq(XEcsQSN@p^xq`YwA04!VLA&9UaA^e#b4W;{wWu5wSW-Y+ffMX^XNEu2>@ ztoy-e{M?J-p{veT@MR5HrKRCJ>^G~FK?@-G_a#> zFV|Oc?4psaYXkK^h$k1v?|v{jbJ6)@r@2%tgGg#z9q;r09n+%Aw}TM_HMcRVq4K#M zr5cyVm%Q`l&Dyn2O@~DkEYPOP!Af-p+GN}wf0!mVXV&d>5f)fr*^-<<3sx#I=H0TQj;vsK*QUcamPII^sRamMpy5nmdk6u zUD&mL)4QAJq`|1fz4=(pEIpl#fJYqliOzfDb*NT%9i?4e9a z5PtGcHI*d^=sx4B{PxGUezepnsn08gx|vJmt;kvo*4vr@U2Fv-rA}a+h;2Z|&49%3C|Nm?W!Wdrn-B zG=3EzyzR>Rlcu@XQ3oZ0Ps;e=0QKN`;_H2x^IyOB1!6F;JX|GFO26)?nQ`;~{m_LM zV-mgWu;9UO&c79)%<7I|x)-H<02*oh)226O52N(t*7)GfV_;K9eA%t>LK~j=~=MkDT&rkwJEdM!uX+rvCatJnUfMq zV4*MXMYegpkkx9NQm2>b5?SsMQtck&r%(=8dQOSGo_S?)nm&7%NmYa*?yyRGpqG3QuWTB@SFmn4Sy7(bM;Tl2Va=*DR)|^t7l=n^0^Z=HGb-0l1u-EZC+cIp*tBSVNA4Ay*GR)*{(F| zjXdwzl>xoQFxeKRe8jTsP<~^1x?|_yJ|a^11a#H@B_8UrzVePPogN`8-XJi4Lb@v3 zo)_EXfUe5OzM?=L+->KrzCsrG=?CR*KXIKSF7}6?mNPl@UC8+Lhu|MBf1mAncPBFE zJsg?;H`9gxTSZIs=@MDabm4x3h4ITDtL81aWeOP79=W&Fc0bIZ%LBv$ALC~uP8WWE z$F0V1Ts~4hRlXdUsB9dFku`pBBJS3<75%<@yB#cfQev_D=YJDr(FjWiKgmhT??`L8 zo{0a9@vKQw{08AI;fO(^nf$Y#vS5(#tJ~UNbBvOolr~ci*Mc&!)Jm4-t_^PS(PiL8YG0#={%YXATM diff --git a/packages/keepalive-ws/docker-compose.yml b/packages/keepalive-ws/docker-compose.yml new file mode 100644 index 0000000..a05de07 --- /dev/null +++ b/packages/keepalive-ws/docker-compose.yml @@ -0,0 +1,16 @@ +version: "3.8" +services: + redis: + image: redis:7 + ports: + - "6379:6379" + command: ["redis-server", "--save", "", "--appendonly", "no"] + + redis-commander: + image: rediscommander/redis-commander:latest + environment: + - REDIS_HOSTS=local:redis:6379 + ports: + - "8081:8081" + depends_on: + - redis diff --git a/packages/keepalive-ws/package.json b/packages/keepalive-ws/package.json index ac8f3f0..fe0bbf0 100644 --- a/packages/keepalive-ws/package.json +++ b/packages/keepalive-ws/package.json @@ -40,6 +40,7 @@ "keywords": [], "license": "Apache-2.0", "dependencies": { + "ioredis": "^5.6.1", "ws": "^8.9.0" }, "devDependencies": { diff --git a/packages/keepalive-ws/src/client/client.ts b/packages/keepalive-ws/src/client/client.ts index 2b0acf2..29fb4d3 100644 --- a/packages/keepalive-ws/src/client/client.ts +++ b/packages/keepalive-ws/src/client/client.ts @@ -73,7 +73,21 @@ export class KeepAliveClient extends EventEmitter { private setupConnectionEvents(): void { // Forward relevant events from connection to client this.connection.on("message", (data) => { + // Forward the raw message event this.emit("message", data); + + // Also forward the specific command event if it's not a system event + // (System events like ping/latency are handled separately below) + const systemCommands = [ + "ping", + "pong", + "latency", + "latency:request", + "latency:response", + ]; + if (data.command && !systemCommands.includes(data.command)) { + this.emit(data.command, data.payload); + } }); this.connection.on("close", () => { @@ -150,8 +164,8 @@ export class KeepAliveClient extends EventEmitter { new CodeError( "WebSocket connection error", "ECONNECTION", - "ConnectionError", - ), + "ConnectionError" + ) ); }; } catch (error) { @@ -233,11 +247,12 @@ export class KeepAliveClient extends EventEmitter { if (attempt <= this.options.maxReconnectAttempts) { setTimeout(connect, this.options.reconnectInterval); - } else { - this.isReconnecting = false; - this._status = Status.OFFLINE; - this.emit("reconnectfailed"); + return; } + + this.isReconnecting = false; + this._status = Status.OFFLINE; + this.emit("reconnectfailed"); }; this.socket.onopen = () => { @@ -268,13 +283,13 @@ export class KeepAliveClient extends EventEmitter { command: string, payload?: any, expiresIn: number = 30000, - callback?: (result: any, error?: Error) => void, + callback?: (result: any, error?: Error) => void ): Promise { // Ensure we're connected before sending commands if (this._status !== Status.ONLINE) { return this.connect() .then(() => - this.connection.command(command, payload, expiresIn, callback), + this.connection.command(command, payload, expiresIn, callback) ) .catch((error) => { if (callback) { diff --git a/packages/keepalive-ws/src/client/connection.ts b/packages/keepalive-ws/src/client/connection.ts index 081a7ee..8f2a414 100644 --- a/packages/keepalive-ws/src/client/connection.ts +++ b/packages/keepalive-ws/src/client/connection.ts @@ -115,7 +115,7 @@ export class Connection extends EventEmitter { command: string, payload: any, expiresIn: number | null = 30_000, - callback?: (result: any, error?: Error) => void, + callback?: (result: any, error?: Error) => void ): Promise | null { const id = this.ids.reserve(); const cmd: Command = { id, command, payload: payload ?? {} }; @@ -142,17 +142,17 @@ export class Connection extends EventEmitter { const timeoutPromise = new Promise((_, reject) => { setTimeout(() => { - if (this.callbacks[id]) { - this.ids.release(id); - delete this.callbacks[id]; - reject( - new CodeError( - `Command timed out after ${expiresIn}ms.`, - "ETIMEOUT", - "TimeoutError", - ), - ); - } + if (!this.callbacks[id]) return; + + this.ids.release(id); + delete this.callbacks[id]; + reject( + new CodeError( + `Command timed out after ${expiresIn}ms.`, + "ETIMEOUT", + "TimeoutError" + ) + ); }, expiresIn); }); diff --git a/packages/keepalive-ws/src/client/ids.ts b/packages/keepalive-ws/src/client/ids.ts index 1d0d252..942ac49 100644 --- a/packages/keepalive-ws/src/client/ids.ts +++ b/packages/keepalive-ws/src/client/ids.ts @@ -10,7 +10,7 @@ export class IdManager { release(id: number) { if (id < 0 || id > this.maxIndex) { throw new TypeError( - `ID must be between 0 and ${this.maxIndex}. Got ${id}.`, + `ID must be between 0 and ${this.maxIndex}. Got ${id}.` ); } this.ids[id] = false; @@ -36,7 +36,7 @@ export class IdManager { if (this.index === startIndex) { throw new Error( - `All IDs are reserved. Make sure to release IDs when they are no longer used.`, + `All IDs are reserved. Make sure to release IDs when they are no longer used.` ); } } diff --git a/packages/keepalive-ws/src/server/connection.ts b/packages/keepalive-ws/src/server/connection.ts index 81d9af2..d81bb4e 100644 --- a/packages/keepalive-ws/src/server/connection.ts +++ b/packages/keepalive-ws/src/server/connection.ts @@ -20,7 +20,7 @@ export class Connection extends EventEmitter { constructor( socket: WebSocket, req: IncomingMessage, - options: KeepAliveServerOptions, + options: KeepAliveServerOptions ) { super(); this.socket = socket; diff --git a/packages/keepalive-ws/src/server/index.ts b/packages/keepalive-ws/src/server/index.ts index 04992fc..7004eb1 100644 --- a/packages/keepalive-ws/src/server/index.ts +++ b/packages/keepalive-ws/src/server/index.ts @@ -1,5 +1,11 @@ import { IncomingMessage } from "node:http"; import { ServerOptions, WebSocket, WebSocketServer } from "ws"; +import type { RedisOptions } from "ioredis"; +import { + RoomManager, + InMemoryRoomManager, + RedisRoomManager, +} from "./room-manager"; import { CodeError } from "../common/codeerror"; import { Command, parseCommand } from "../common/message"; import { Status } from "../common/status"; @@ -34,6 +40,16 @@ export type KeepAliveServerOptions = ServerOptions & { * @default 5000 */ latencyInterval?: number; + + /** + * Room backend type: "memory" (default) or "redis" + */ + roomBackend?: "memory" | "redis"; + + /** + * Redis options, required if roomBackend is "redis" + */ + redisOptions?: RedisOptions; }; export class KeepAliveServer extends WebSocketServer { @@ -44,7 +60,7 @@ export class KeepAliveServer extends WebSocketServer { } = {}; globalMiddlewares: SocketMiddleware[] = []; middlewares: { [key: string]: SocketMiddleware[] } = {}; - rooms: { [roomName: string]: Set } = {}; + roomManager: RoomManager; serverOptions: ServerOptions & { pingInterval: number; latencyInterval: number; @@ -67,6 +83,23 @@ export class KeepAliveServer extends WebSocketServer { latencyInterval: opts.latencyInterval ?? 5_000, }; + // Room manager selection + if (opts.roomBackend === "redis") { + if (!opts.redisOptions) { + throw new Error( + "redisOptions must be provided when roomBackend is 'redis'" + ); + } + this.roomManager = new RedisRoomManager( + opts.redisOptions, + (id: string) => this.connections[id] + ); + } else { + this.roomManager = new InMemoryRoomManager( + (id: string) => this.connections[id] + ); + } + this.on("listening", () => { this._listening = true; this.status = Status.ONLINE; @@ -80,14 +113,14 @@ export class KeepAliveServer extends WebSocketServer { this.applyListeners(); } - private cleanupConnection(connection: Connection): void { + private async cleanupConnection(connection: Connection): Promise { connection.stopIntervals(); delete this.connections[connection.id]; if (this.remoteAddressToConnections[connection.remoteAddress]) { this.remoteAddressToConnections[connection.remoteAddress] = this.remoteAddressToConnections[connection.remoteAddress].filter( - (conn) => conn.id !== connection.id, + (conn) => conn.id !== connection.id ); if ( @@ -98,9 +131,7 @@ export class KeepAliveServer extends WebSocketServer { } // Remove from all rooms - Object.keys(this.rooms).forEach((roomName) => { - this.rooms[roomName].delete(connection.id); - }); + await this.roomManager.removeFromAllRooms(connection); } private applyListeners(): void { @@ -113,13 +144,13 @@ export class KeepAliveServer extends WebSocketServer { } this.remoteAddressToConnections[connection.remoteAddress].push( - connection, + connection ); this.emit("connected", connection); - connection.on("close", () => { - this.cleanupConnection(connection); + connection.on("close", async () => { + await this.cleanupConnection(connection); this.emit("close", connection); }); @@ -137,7 +168,7 @@ export class KeepAliveServer extends WebSocketServer { command.id, command.command, command.payload, - connection, + connection ); } } catch (error) { @@ -172,7 +203,7 @@ export class KeepAliveServer extends WebSocketServer { broadcastRemoteAddress( connection: Connection, command: string, - payload: any, + payload: any ): void { const cmd: Command = { command, payload }; const connections = @@ -194,47 +225,30 @@ export class KeepAliveServer extends WebSocketServer { * Given a roomName, a command and a payload, broadcasts to all Connections * that are in the room. */ - broadcastRoom(roomName: string, command: string, payload: any): void { - const cmd: Command = { command, payload }; - const room = this.rooms[roomName]; - - if (!room) return; - - room.forEach((connectionId) => { - const connection = this.connections[connectionId]; - if (connection) { - connection.send(cmd); - } - }); + async broadcastRoom( + roomName: string, + command: string, + payload: any + ): Promise { + await this.roomManager.broadcastRoom(roomName, command, payload); } /** * Given a roomName, command, payload, and Connection OR Connection[], broadcasts to all Connections * that are in the room except the provided Connection(s). */ - broadcastRoomExclude( + async broadcastRoomExclude( roomName: string, command: string, payload: any, - connection: Connection | Connection[], - ): void { - const cmd: Command = { command, payload }; - const room = this.rooms[roomName]; - - if (!room) return; - - const excludeIds = Array.isArray(connection) - ? connection.map((c) => c.id) - : [connection.id]; - - room.forEach((connectionId) => { - if (!excludeIds.includes(connectionId)) { - const conn = this.connections[connectionId]; - if (conn) { - conn.send(cmd); - } - } - }); + connection: Connection | Connection[] + ): Promise { + await this.roomManager.broadcastRoomExclude( + roomName, + command, + payload, + connection + ); } /** @@ -244,7 +258,7 @@ export class KeepAliveServer extends WebSocketServer { broadcastExclude( connection: Connection, command: string, - payload: any, + payload: any ): void { const cmd: Command = { command, payload }; @@ -258,46 +272,39 @@ export class KeepAliveServer extends WebSocketServer { /** * Add a connection to a room */ - addToRoom(roomName: string, connection: Connection): void { - this.rooms[roomName] = this.rooms[roomName] ?? new Set(); - this.rooms[roomName].add(connection.id); + async addToRoom(roomName: string, connection: Connection): Promise { + await this.roomManager.addToRoom(roomName, connection); } /** * Remove a connection from a room */ - removeFromRoom(roomName: string, connection: Connection): void { - if (!this.rooms[roomName]) return; - this.rooms[roomName].delete(connection.id); + async removeFromRoom( + roomName: string, + connection: Connection + ): Promise { + await this.roomManager.removeFromRoom(roomName, connection); } /** * Remove a connection from all rooms */ - removeFromAllRooms(connection: Connection | string): void { - const connectionId = - typeof connection === "string" ? connection : connection.id; - - Object.keys(this.rooms).forEach((roomName) => { - this.rooms[roomName].delete(connectionId); - }); + async removeFromAllRooms(connection: Connection | string): Promise { + await this.roomManager.removeFromAllRooms(connection); } /** * Returns all connections in a room */ - getRoom(roomName: string): Connection[] { - const ids = this.rooms[roomName] || new Set(); - return Array.from(ids) - .map((id) => this.connections[id]) - .filter(Boolean); + async getRoom(roomName: string): Promise { + return this.roomManager.getRoom(roomName); } /** * Clear all connections from a room */ - clearRoom(roomName: string): void { - this.rooms[roomName] = new Set(); + async clearRoom(roomName: string): Promise { + await this.roomManager.clearRoom(roomName); } /** @@ -306,7 +313,7 @@ export class KeepAliveServer extends WebSocketServer { async registerCommand( command: string, callback: (context: WSContext) => Promise | T, - middlewares: SocketMiddleware[] = [], + middlewares: SocketMiddleware[] = [] ): Promise { this.commands[command] = callback; @@ -322,7 +329,7 @@ export class KeepAliveServer extends WebSocketServer { */ prependMiddlewareToCommand( command: string, - middlewares: SocketMiddleware[], + middlewares: SocketMiddleware[] ): void { if (middlewares.length) { this.middlewares[command] = this.middlewares[command] || []; @@ -335,7 +342,7 @@ export class KeepAliveServer extends WebSocketServer { */ appendMiddlewareToCommand( command: string, - middlewares: SocketMiddleware[], + middlewares: SocketMiddleware[] ): void { if (middlewares.length) { this.middlewares[command] = this.middlewares[command] || []; @@ -350,7 +357,7 @@ export class KeepAliveServer extends WebSocketServer { id: number, command: string, payload: any, - connection: Connection, + connection: Connection ): Promise { const context = new WSContext(this, connection, payload); @@ -359,7 +366,7 @@ export class KeepAliveServer extends WebSocketServer { throw new CodeError( `Command [${command}] not found.`, "ENOTFOUND", - "CommandError", + "CommandError" ); } diff --git a/packages/keepalive-ws/src/server/room-manager.ts b/packages/keepalive-ws/src/server/room-manager.ts new file mode 100644 index 0000000..222797d --- /dev/null +++ b/packages/keepalive-ws/src/server/room-manager.ts @@ -0,0 +1,192 @@ +import { Connection } from "./connection"; +import Redis from "ioredis"; +import type { RedisOptions } from "ioredis"; + +export interface RoomManager { + addToRoom(roomName: string, connection: Connection): Promise; + removeFromRoom(roomName: string, connection: Connection): Promise; + removeFromAllRooms(connection: Connection | string): Promise; + getRoom(roomName: string): Promise; + clearRoom(roomName: string): Promise; + broadcastRoom(roomName: string, command: string, payload: any): Promise; + broadcastRoomExclude( + roomName: string, + command: string, + payload: any, + connection: Connection | Connection[] + ): Promise; +} + +export class InMemoryRoomManager implements RoomManager { + private rooms: { [roomName: string]: Set } = {}; + private getConnectionById: (id: string) => Connection | undefined; + + constructor(getConnectionById: (id: string) => Connection | undefined) { + this.getConnectionById = getConnectionById; + } + + async addToRoom(roomName: string, connection: Connection): Promise { + this.rooms[roomName] = this.rooms[roomName] ?? new Set(); + this.rooms[roomName].add(connection.id); + } + + async removeFromRoom( + roomName: string, + connection: Connection + ): Promise { + if (!this.rooms[roomName]) return; + this.rooms[roomName].delete(connection.id); + } + + async removeFromAllRooms(connection: Connection | string): Promise { + const connectionId = + typeof connection === "string" ? connection : connection.id; + Object.keys(this.rooms).forEach((roomName) => { + this.rooms[roomName].delete(connectionId); + }); + } + + async getRoom(roomName: string): Promise { + const ids = this.rooms[roomName] || new Set(); + return Array.from(ids) + .map((id) => this.getConnectionById(id)) + .filter(Boolean) as Connection[]; + } + + async clearRoom(roomName: string): Promise { + this.rooms[roomName] = new Set(); + } + + async broadcastRoom( + roomName: string, + command: string, + payload: any + ): Promise { + const ids = this.rooms[roomName]; + if (!ids) return; + for (const connectionId of ids) { + const connection = this.getConnectionById(connectionId); + if (connection) { + connection.send({ command, payload }); + } + } + } + + async broadcastRoomExclude( + roomName: string, + command: string, + payload: any, + connection: Connection | Connection[] + ): Promise { + const ids = this.rooms[roomName]; + if (!ids) return; + const excludeIds = Array.isArray(connection) + ? connection.map((c) => c.id) + : [connection.id]; + for (const connectionId of ids) { + if (!excludeIds.includes(connectionId)) { + const conn = this.getConnectionById(connectionId); + if (conn) { + conn.send({ command, payload }); + } + } + } + } +} + +export class RedisRoomManager implements RoomManager { + private redis: Redis; + private getConnectionById: (id: string) => Connection | undefined; + + constructor( + redisOptions: RedisOptions, + getConnectionById: (id: string) => Connection | undefined + ) { + this.redis = new Redis(redisOptions); + this.getConnectionById = getConnectionById; + // TODO: reconnect logic? + } + + private roomKey(roomName: string) { + return `room:${roomName}`; + } + + private connRoomsKey(connectionId: string) { + return `connection:${connectionId}:rooms`; + } + + async addToRoom(roomName: string, connection: Connection): Promise { + await this.redis.sadd(this.roomKey(roomName), connection.id); + await this.redis.sadd(this.connRoomsKey(connection.id), roomName); + } + + async removeFromRoom( + roomName: string, + connection: Connection + ): Promise { + await this.redis.srem(this.roomKey(roomName), connection.id); + await this.redis.srem(this.connRoomsKey(connection.id), roomName); + } + + async removeFromAllRooms(connection: Connection | string): Promise { + const connectionId = + typeof connection === "string" ? connection : connection.id; + const roomNames = await this.redis.smembers( + this.connRoomsKey(connectionId) + ); + + if (!(roomNames.length > 0)) return; + + const pipeline = this.redis.pipeline(); + for (const roomName of roomNames) { + pipeline.srem(this.roomKey(roomName), connectionId); + } + pipeline.del(this.connRoomsKey(connectionId)); + await pipeline.exec(); + } + + async getRoom(roomName: string): Promise { + const ids = await this.redis.smembers(this.roomKey(roomName)); + return ids + .map((id) => this.getConnectionById(id)) + .filter(Boolean) as Connection[]; + } + + async clearRoom(roomName: string): Promise { + await this.redis.del(this.roomKey(roomName)); + } + + async broadcastRoom( + roomName: string, + command: string, + payload: any + ): Promise { + const ids = await this.redis.smembers(this.roomKey(roomName)); + for (const connectionId of ids) { + const connection = this.getConnectionById(connectionId); + if (connection) { + connection.send({ command, payload }); + } + } + } + + async broadcastRoomExclude( + roomName: string, + command: string, + payload: any, + connection: Connection | Connection[] + ): Promise { + const ids = await this.redis.smembers(this.roomKey(roomName)); + const excludeIds = Array.isArray(connection) + ? connection.map((c) => c.id) + : [connection.id]; + for (const connectionId of ids) { + if (!excludeIds.includes(connectionId)) { + const conn = this.getConnectionById(connectionId); + if (conn) { + conn.send({ command, payload }); + } + } + } + } +} diff --git a/packages/keepalive-ws/tests/advanced.test.ts b/packages/keepalive-ws/tests/advanced.test.ts index dc1172a..790d6ba 100644 --- a/packages/keepalive-ws/tests/advanced.test.ts +++ b/packages/keepalive-ws/tests/advanced.test.ts @@ -2,13 +2,12 @@ import { describe, test, expect, beforeEach, afterEach } from "vitest"; import { KeepAliveClient, Status } from "../src/client/client"; import { KeepAliveServer } from "../src/server/index"; -const createTestServer = (port: number) => { - return new KeepAliveServer({ +const createTestServer = (port: number) => + new KeepAliveServer({ port, pingInterval: 1000, latencyInterval: 500, }); -}; describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { const port = 8125; @@ -49,14 +48,15 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { }); test("command times out when server doesn't respond", async () => { - await server.registerCommand("never-responds", async () => { - return new Promise(() => {}); - }); + await server.registerCommand( + "never-responds", + async () => new Promise(() => {}) + ); await client.connect(); await expect( - client.command("never-responds", "Should timeout", 500), + client.command("never-responds", "Should timeout", 500) ).rejects.toThrow(/timed out/); }, 2000); @@ -82,9 +82,10 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { return `Slow: ${context.payload}`; }); - await server.registerCommand("echo", async (context) => { - return `Echo: ${context.payload}`; - }); + await server.registerCommand( + "echo", + async (context) => `Echo: ${context.payload}` + ); await client.connect(); @@ -98,9 +99,7 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { }, 3000); test("handles large payloads correctly", async () => { - await server.registerCommand("echo", async (context) => { - return context.payload; - }); + await server.registerCommand("echo", async (context) => context.payload); await client.connect(); @@ -123,9 +122,10 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { }, 10000); test("server handles multiple client connections", async () => { - await server.registerCommand("echo", async (context) => { - return `Echo: ${context.payload}`; - }); + await server.registerCommand( + "echo", + async (context) => `Echo: ${context.payload}` + ); const clients = Array(5) .fill(0) @@ -134,7 +134,7 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { await Promise.all(clients.map((client) => client.connect())); const results = await Promise.all( - clients.map((client, i) => client.command("echo", `Client ${i}`, 1000)), + clients.map((client, i) => client.command("echo", `Client ${i}`, 1000)) ); results.forEach((result, i) => { diff --git a/packages/keepalive-ws/tests/basic.test.ts b/packages/keepalive-ws/tests/basic.test.ts index b1ea141..acee5d3 100644 --- a/packages/keepalive-ws/tests/basic.test.ts +++ b/packages/keepalive-ws/tests/basic.test.ts @@ -2,13 +2,12 @@ import { describe, test, expect, beforeEach, afterEach } from "vitest"; import { KeepAliveClient, Status } from "../src/client/client"; import { KeepAliveServer } from "../src/server/index"; -const createTestServer = (port: number) => { - return new KeepAliveServer({ +const createTestServer = (port: number) => + new KeepAliveServer({ port, pingInterval: 1000, latencyInterval: 500, }); -}; describe("Basic KeepAliveClient and KeepAliveServer Tests", () => { const port = 8124; @@ -48,18 +47,17 @@ describe("Basic KeepAliveClient and KeepAliveServer Tests", () => { }); test("client-server connection should be online", async () => { - await server.registerCommand("echo", async (context) => { - return context.payload; - }); + await server.registerCommand("echo", async (context) => context.payload); await client.connect(); expect(client.status).toBe(Status.ONLINE); }, 10000); test("simple echo command", async () => { - await server.registerCommand("echo", async (context) => { - return `Echo: ${context.payload}`; - }); + await server.registerCommand( + "echo", + async (context) => `Echo: ${context.payload}` + ); await client.connect(); @@ -68,9 +66,7 @@ describe("Basic KeepAliveClient and KeepAliveServer Tests", () => { }, 10000); test("connect should resolve when already connected", async () => { - await server.registerCommand("echo", async (context) => { - return context.payload; - }); + await server.registerCommand("echo", async (context) => context.payload); await client.connect(); expect(client.status).toBe(Status.ONLINE); diff --git a/packages/keepalive-ws/tests/redis-room.test.ts b/packages/keepalive-ws/tests/redis-room.test.ts new file mode 100644 index 0000000..17f53e0 --- /dev/null +++ b/packages/keepalive-ws/tests/redis-room.test.ts @@ -0,0 +1,343 @@ +import { describe, test, expect, beforeEach, afterEach } from "vitest"; +import Redis from "ioredis"; +import { KeepAliveClient, Status } from "../src/client/client"; +import { KeepAliveServer } from "../src/server/index"; + +const REDIS_HOST = process.env.REDIS_HOST || "127.0.0.1"; +const REDIS_PORT = process.env.REDIS_PORT + ? parseInt(process.env.REDIS_PORT, 10) + : 6379; + +const createRedisServer = (port: number) => + new KeepAliveServer({ + port, + pingInterval: 1000, + latencyInterval: 500, + roomBackend: "redis", + redisOptions: { host: REDIS_HOST, port: REDIS_PORT }, + }); + +const flushRedis = async () => { + const redis = new Redis({ host: REDIS_HOST, port: REDIS_PORT }); + await redis.flushdb(); + await redis.quit(); +}; + +describe("KeepAliveServer with Redis room backend", () => { + const port = 8126; + let server: KeepAliveServer; + let clientA: KeepAliveClient; + let clientB: KeepAliveClient; + + beforeEach(async () => { + await flushRedis(); + + server = createRedisServer(port); + + await new Promise((resolve) => { + server.on("listening", () => resolve()); + if (server.listening) resolve(); + }); + + clientA = new KeepAliveClient(`ws://localhost:${port}`); + clientB = new KeepAliveClient(`ws://localhost:${port}`); + }); + + afterEach(async () => { + if (clientA.status === Status.ONLINE) await clientA.close(); + if (clientB.status === Status.ONLINE) await clientB.close(); + + return new Promise((resolve) => { + if (server) { + server.close(() => resolve()); + } else { + resolve(); + } + }); + }); + + test("multi-instance room membership and broadcast with Redis", async () => { + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + + await server.registerCommand("broadcast-room", async (context) => { + await server.broadcastRoom( + context.payload.room, + "room-message", + context.payload.message + ); + return { sent: true }; + }); + + await clientA.connect(); + await clientB.connect(); + + let receivedA = false; + let receivedB = false; + + clientA.on("room-message", (data) => { + if (data === "hello") receivedA = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello") receivedB = true; + }); + + await clientA.command("join-room", { room: "testroom" }); + await clientB.command("join-room", { room: "testroom" }); + + await clientA.command("broadcast-room", { + room: "testroom", + message: "hello", + }); + + // Wait for both events or timeout + await new Promise((resolve, reject) => { + const timeout = setTimeout(resolve, 2000); + const check = () => { + if (receivedA && receivedB) { + clearTimeout(timeout); + resolve(null); + } + }; + clientA.on("room-message", check); + clientB.on("room-message", check); + }); + + expect(receivedA).toBe(true); + expect(receivedB).toBe(true); + }, 10000); + + test("removeFromRoom removes a client from a specific room", async () => { + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + await server.registerCommand("leave-room", async (context) => { + await server.removeFromRoom(context.payload.room, context.connection); + return { left: true }; + }); + await server.registerCommand("broadcast-room", async (context) => { + await server.broadcastRoom( + context.payload.room, + "room-message", + context.payload.message + ); + return { sent: true }; + }); + + await clientA.connect(); + await clientB.connect(); + + let receivedA = false; + let receivedB = false; + + clientA.on("room-message", (data) => { + if (data === "hello after leave") receivedA = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello after leave") receivedB = true; + }); + + await clientA.command("join-room", { room: "testroom-leave" }); + await clientB.command("join-room", { room: "testroom-leave" }); + + // Ensure both are in before leaving + await new Promise((res) => setTimeout(res, 100)); // Short delay for redis propagation + + await clientA.command("leave-room", { room: "testroom-leave" }); + + // Wait a bit for leave command to process + await new Promise((res) => setTimeout(res, 100)); + + await clientB.command("broadcast-room", { + room: "testroom-leave", + message: "hello after leave", + }); + + // Wait for potential message or timeout + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(receivedA).toBe(false); // Client A should not receive the message + expect(receivedB).toBe(true); // Client B should receive the message + }, 10000); + + test("removeFromAllRooms removes a client from all rooms", async () => { + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + await server.registerCommand("leave-all-rooms", async (context) => { + await server.removeFromAllRooms(context.connection); + return { left_all: true }; + }); + await server.registerCommand("broadcast-room", async (context) => { + await server.broadcastRoom( + context.payload.room, + "room-message", + context.payload.message + ); + return { sent: true }; + }); + + await clientA.connect(); + await clientB.connect(); + + let receivedA_room1 = false; + let receivedA_room2 = false; + let receivedB_room1 = false; + + clientA.on("room-message", (data) => { + if (data === "hello room1 after all") receivedA_room1 = true; + if (data === "hello room2 after all") receivedA_room2 = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello room1 after all") receivedB_room1 = true; + }); + + await clientA.command("join-room", { room: "room1" }); + await clientA.command("join-room", { room: "room2" }); + await clientB.command("join-room", { room: "room1" }); + + // Ensure joins are processed + await new Promise((res) => setTimeout(res, 100)); + + await clientA.command("leave-all-rooms", {}); + + // Wait a bit for leave command to process + await new Promise((res) => setTimeout(res, 100)); + + // Broadcast to room1 + await clientB.command("broadcast-room", { + room: "room1", + message: "hello room1 after all", + }); + // Broadcast to room2 (no one should be left) + await clientB.command("broadcast-room", { + // Client B isn't in room2, but can still broadcast + room: "room2", + message: "hello room2 after all", + }); + + // Wait for potential messages or timeout + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(receivedA_room1).toBe(false); // Client A should not receive from room1 + expect(receivedA_room2).toBe(false); // Client A should not receive from room2 + expect(receivedB_room1).toBe(true); // Client B should receive from room1 + }, 10000); + + test("clearRoom removes all clients from a room", async () => { + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + await server.registerCommand("clear-room", async (context) => { + await server.clearRoom(context.payload.room); + return { cleared: true }; + }); + await server.registerCommand("broadcast-room", async (context) => { + await server.broadcastRoom( + context.payload.room, + "room-message", + context.payload.message + ); + return { sent: true }; + }); + + await clientA.connect(); + await clientB.connect(); + + let receivedA = false; + let receivedB = false; + + clientA.on("room-message", (data) => { + if (data === "hello after clear") receivedA = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello after clear") receivedB = true; + }); + + await clientA.command("join-room", { room: "testroom-clear" }); + await clientB.command("join-room", { room: "testroom-clear" }); + + // Ensure joins are processed + await new Promise((res) => setTimeout(res, 100)); + + await clientA.command("clear-room", { room: "testroom-clear" }); + + // Wait a bit for clear command to process + await new Promise((res) => setTimeout(res, 100)); + + // Try broadcasting (client A is still connected, just not in room) + await clientA.command("broadcast-room", { + room: "testroom-clear", + message: "hello after clear", + }); + + // Wait for potential messages or timeout + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(receivedA).toBe(false); // Client A should not receive + expect(receivedB).toBe(false); // Client B should not receive + }, 10000); + + test("broadcastRoomExclude sends to all except specified clients", async () => { + const clientC = new KeepAliveClient(`ws://localhost:${port}`); + + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + await server.registerCommand("broadcast-exclude", async (context) => { + await server.broadcastRoomExclude( + context.payload.room, + "room-message", + context.payload.message, + context.connection // Exclude sender + ); + return { sent_exclude: true }; + }); + + await clientA.connect(); + await clientB.connect(); + await clientC.connect(); + + let receivedA = false; + let receivedB = false; + let receivedC = false; + + clientA.on("room-message", (data) => { + if (data === "hello exclude") receivedA = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello exclude") receivedB = true; + }); + clientC.on("room-message", (data) => { + if (data === "hello exclude") receivedC = true; + }); + + await clientA.command("join-room", { room: "testroom-exclude" }); + await clientB.command("join-room", { room: "testroom-exclude" }); + await clientC.command("join-room", { room: "testroom-exclude" }); + + // Ensure joins are processed + await new Promise((res) => setTimeout(res, 100)); + + // Client A broadcasts, excluding itself + await clientA.command("broadcast-exclude", { + room: "testroom-exclude", + message: "hello exclude", + }); + + // Wait for potential messages or timeout + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(receivedA).toBe(false); // Client A (sender) should not receive + expect(receivedB).toBe(true); // Client B should receive + expect(receivedC).toBe(true); // Client C should receive + + if (clientC.status === Status.ONLINE) await clientC.close(); + }, 10000); +});