From 3b147dd5bc9db5e9cd75418048dd0a5ea4b62db2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 18 Mar 2022 22:00:20 +0800 Subject: [PATCH] handle except --- include/libs/transport/trpc.h | 30 +++--- source/libs/transport/inc/transComm.h | 37 +++++--- source/libs/transport/inc/transportInt.h | 3 - source/libs/transport/src/.transCli.c.swn | Bin 0 -> 49152 bytes source/libs/transport/src/trans.c | 12 ++- source/libs/transport/src/transCli.c | 17 +++- source/libs/transport/src/transComm.c | 56 ++++++++++- source/libs/transport/src/transSrv.c | 20 ++-- source/libs/transport/test/transUT.cc | 37 +------- source/libs/transport/test/transportTests.cc | 94 +++++++++++++++++++ 10 files changed, 225 insertions(+), 81 deletions(-) create mode 100644 source/libs/transport/src/.transCli.c.swn diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d8dcf72bed..aae0c6bd22 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -70,12 +70,19 @@ typedef struct SRpcInit { // call back to retrieve the client auth info, for server app only int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); - // to support Send messages multiple times on a link - void *(*mfp)(void *parent, tmsg_t msgType); - void *parent; } SRpcInit; +typedef struct { + void * val; + int32_t len; + void (*free)(void *arg); +} SRpcCtxVal; + +typedef struct { + SHashObj *args; +} SRpcCtx; + int32_t rpcInit(); void rpcCleanup(); void * rpcOpen(const SRpcInit *pRpc); @@ -84,16 +91,17 @@ void * rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void * rpcReallocCont(void *ptr, int contLen); void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendResponse(const SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(int64_t rid); +void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +void rpcSendResponse(const SRpcMsg *pMsg); +void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcReportProgress(void *pConn, char *pCont, int contLen); +void rpcCancelRequest(int64_t rid); +void rpcRegisterBrokenLinkArg(SRpcMsg *msg); // just release client conn to rpc instance, no close sock -void rpcReleaseHandle(void *handle, int8_t type); - +void rpcReleaseHandle(void *handle, int8_t type); // void rpcRefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a60531a429..a939bbd644 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -14,6 +14,10 @@ */ #ifdef USE_UV +#ifdef __cplusplus +extern "C" { +#endif + #include #include "lz4.h" #include "os.h" @@ -121,24 +125,21 @@ typedef struct { } SRpcReqContext; typedef SRpcMsg STransMsg; +typedef SRpcCtx STransCtx; +typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; typedef struct { - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - tmsg_t msgType; // message type - uint8_t* pCont; // content provided by app - int32_t contLen; // content length - // int32_t code; // error code - // int16_t numOfTry; // number of try for different servers - // int8_t oldInUse; // server EP inUse passed by app - // int8_t redirect; // flag to indicate redirect + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + tmsg_t msgType; // message type int8_t connType; // connection type cli/srv int64_t rid; // refId returned by taosAddRef - STransMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API + STransCtx appCtx; // + STransMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API int hThrdIdx; char* ip; @@ -181,7 +182,7 @@ typedef struct { #pragma pack(pop) typedef enum { Normal, Quit, Release } STransMsgType; -typedef enum { ConnNormal, ConnAcquire, ConnRelease } ConnStatus; +typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) @@ -259,7 +260,7 @@ void transUnrefCliHandle(void* handle); void transReleaseCliHandle(void* handle); void transReleaseSrvHandle(void* handle); -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); void transSendResponse(const STransMsg* pMsg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); @@ -270,4 +271,14 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void transCloseClient(void* arg); void transCloseServer(void* arg); +void transCtxInit(STransCtx* ctx); +void transCtxDestroy(STransCtx* ctx); +void transCtxClear(STransCtx* ctx); +void transCtxMerge(STransCtx* dst, STransCtx* src); +void* transCtxDumpVal(STransCtx* ctx, int32_t key); + +#ifdef __cplusplus +} +#endif + #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index e739380467..1395408960 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,9 +63,6 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); - bool (*pfp)(void* parent, tmsg_t msgType); - void* (*mfp)(void* parent, tmsg_t msgType); - bool (*efp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/.transCli.c.swn b/source/libs/transport/src/.transCli.c.swn new file mode 100644 index 0000000000000000000000000000000000000000..583fbc9f74257c1841caaac6e0c3890d57c227da GIT binary patch literal 49152 zcmeI537A}0b?2J^#w>P7SPaa7pJgPfrPk^d%aUoeT2{wWOYTLsuuW54{kpqwbycaV zS_@tP1BrvF?mYPErx zZ$7@NPk&X{d-vVt+;h)8=iGDeo8EL_WN&othRuC^9@*FTy74oo-*@}x`~J(b`}!KS zT1%IY_nAF&_MEexI&=29XFPRwxwIQ6cE;7Yay8yKS8k<0nzhA7Dc)EqPd7KV8pUd} zUTd^AHX9}PYp7D*P+BhA-n||LdK5Sw1r}Sg>(4!@?~E;*H!11qt4@m^_1lxjd#ZQ8 zM}Zy%dKBnUphtlo1$q?dQJ_bG9tD1lDA1}utnWFr=Qn(tj{5Hhw0%Fw|1SFPV{P{z z?0@g_-(P6E|8W0%wf{cN7ZA#MsJ~wI-@|S9Pw>~D;JDtnjp|6u?B>^J-RZUg@vw7^>Mt&{uuz7AdrE&^-7>ELrG z_4WM`xBii3#H0Md>Nm_-+(^> zF9X+tLGVCu8~wWnTm;nDPXa6WG+V`1xfC6)m1m+-r92*2XD(hWw+iD#WlYRBX7)`# zYi-oxkN&N+HIulomgIxYW7X2`Vs)kxUv@>mJ+!wuw>GNt|3H+>qpsZgqixYuJm(%6 ziw_N`Q1?s4W`2&vmAKf9ui2U_HA(Gnub{{-86>S-zQSFers^vydgI1u-pCU*;zQAF zqqYzg>-9V(lQs<>EyZ<5`k`92x;85DKXgwsi7iD`oUSdl>~1&WH0xHq_H62nS}ATe zqnTo>NR5g!u1d|gaX4-aM8%nzXeKVz7U~V9id&_qQf$SIe0?K*sI+(`Q%YThQQIt& zgrqy;W~)&l=DgSgm;Ho}JbZk6hR(zQlZt+m!} zOmh|^dQ$KQ?QX?M#>+A}tuz)NnpV||;`XjJC#^DdNlW|{ms(0p#}7p1YAX_97Mrd= zOT|iMx>&k0D$YV*>&hr<)gspuIU0IOyL^u{%B^^%xmse^*UGw9x-xFHXFV2|4!f-B zp}4r9yeV-YYBtNyrlhDUHep=Ws?Ame5vCVsm4>g5Epc2`oj%rzo1VEkmOEIf)JiIR z$Kvd4+;CNB=M8G%D$!h=n}dbonJHZu=u{5~S}Unq(JTzmAm{d`PLvm7#*Hhg#gB#> z;>vVwX~c77N`aenmrje~VrgFDPeTP7I$n#9*2|69jlWzW#_kLv@xerGKU`XnU_~DC zr;~@el?-e@R8sbwi(5*t3pugm>C$|$u{J8#X+7L@#uoZUadat{yP{Nw88_UNRa-Wl zQkRkeVTp}Zp|T^1M1pUxh$6XzJc~-jda+b)9g8aEg>p+0szhPd&c42Dko^4G11-1q zMbV+fc+pX7YhUYFJ)VhYBbaTm)JoGuSGmV3WhNXwl$|@EAET5m@b^^1^iy42*f;C$ zH}zy7j!RLl&csJ`S>#I}suZW=%4HMdJ9kX&8Qd|vXKK&z=oKBRXZ1-d=Y@sxGZ~}| z4>51)LPGxfgkl^=7%yA7y>TP=)MIC#snHZ4ZFxTACS=nEV>c+ZWMVs4OxinC=J1r~engHKUD=GAoD&1AjZDtz zPu(DOrBp@$}$4+V*+j<2mZH`0+Gm-PnLPy75+bz zd){64gc!G6#dt%lqYN=oZ3(+o$`qScNU70lRG9IjWF#Elm}2N7PV}b=>~J^Sg8Vb_ z^x~Y2ib`z`_M5FK$J`)QCT{#jFeo(izmfN)#YQ|6?nljfS$~r&e`!q3mgh*or3}+I zoNDBHcJp}%J5!ZNr1p&4G((*=+jtJ-mM2f^XvD{eli^`qn{L#ujH@owO1XNazYw|U zv@chKD4JL2j?5TmCfu!ToUjm&t06h2r8!Vryw+$GkHJs+Fxk*0HgdD9Ytxn`R-4=; z5!vLz-@2YlJYQ?JR3Y7fVf`#g?)DeTn!l0GaqcgFSrxO{+N79IJ~1^mylZOb@X(&Y zvEf30Ctgz3#7w7`tEI}~OdPGU@aCb~!or66Rk}VaVK+HGJT-ZMK3|J0V0cw()w$?M zxuuz<+>B7l<{HI?4bjM~6c;AFIF8PrZ?)>@ZQOX|$dL_m)x{09#vB3ErMTLRn;W-p zP(BB1i&1mFwpf{A5<47g&Wp>3B*u==tByErOPmb;1H$H4-4vrp-E*Tsf8yuZD zxYgRA86iF#SFNn_LcLO^21knYaFywSI_w=D8`@3!!5t%eMkWrzPbzI9)A9%+%rIFr z%F{@_8O^(@_F(DwSwDjuxBo>>_h1DWgk9CU)wdPR}zoh_BK8nCXbH!8~ zh)|)MWE7pzseQ95w&{yKFFHowm5Nn0XMm1NTTi8{GaG5kLT#ozd(5|Oi2iJp=jK~c zq0}F3*|hno(Zt}s@n|P{{6I8PEp3R-f7X1fSbG|1O1s~H7Qel3_a0t{u6+9Gt1-tx>;3c2}=D{3T4;}%&hmHOja4UE{I0k0G zFxUwm4erBc|2DV{ya~J!Tn^TOlfh4+@i)Qez@6Z;Ky?2&=zV$==ux0YfnSOOr{i)d z&tfW}=V(SL)W_om3^kPKX1p-v?Y_3FM~e9K@(EGqbpdZ&9c6j_cKxtiWVy(%W7a?c z-F{~M_QV_*h}IOGy0(7%p?)hiYo;-2HfG4OU=ABO4*OG6X#I9PG~U*xJd`zO&s}B) z8m1R*2zg6uC!f(;W1(m^NVtIqVq2sc?!wrtD0JL~`ed_URSAhO2S~NH?$x+(8I_mI zYOWfpdZTuluR|8WHfCcv5DJ@}&!IMJjo_Br8R%QCijoP%2BJ;;v%>wwv1VO`@TEf1 zl%4Xo9&0%=x|BhuCUs)A`aCf7l`&sw#$}a-)-qK43X*QVsN>hymQ8<2tKYZeiqy}mOm^(QIM_eLbI+`*vL!@`860tG*@i#D;h=E zwZ&F#0ox<7DvM2go{Xxb^VV;dIp!F6o2j`|#tB1vMutZx+-P*80#CYQ&$T5ts@>)P z3&RL1EI*7a!LMK#A;@>iolscSgh{asFrblo*gyzZB<6XmXQI`(0B4%5^P<&Fey);p znISje7hU8a^nSr8azhfTW0-kYKI4qRXDo`HYL0#?U(kvTav6&j1JwPX8(MqfDpG&8 z5?A|^L29l?U1FGR5_9}N8u;ltK){Vw$YqI^+4YRAtF+$`H*M_0!%hE_KvxSVTxb)C^P8`*$K>N_&Z>|ze!r6u4$ z2u1OrD+$N5l@uk;p$^ZbLn2R0mz$~?sF_k`l4X|J<>@jxiXUlbp<7)>Mu$4qMfU#_ zQ95r`AUpPd(JP>z#K!*wcr|zxI1C2C8Q^4a7dHK!;CI0|co29scKp-9W57RPx8DGE zfd_yOV5c7evfF=vEv~ir5|{)NU=8?J?C`$>Hv`$_PXk-PW^gk29(MR0;4i>+U<&L9 z`@mDc&#}Qj2i^gG4`^-uVsH<(_ub%w;5p!0a0xg7WSifI{rx@g1@I|wD|jFHD{vFI z5}XgV0 zgU5ic(GMR4>W>eAiX@P4Y*=_iGZJ8oPA0&T0o~SR&>if zCx@6VTv;xNfDI<$lTnsj&S-?1QZMkjYq)%#v8*w66jz$D>1)1j$~B}zMp<=RM|UnR z)DILZp|(uW8nHA*$q_@E-;>cRaURA+v}R3onhns=eN+2~$Hqq(qB{;w4esAxP|N*1 z)aLE?ihyNA<;f2~oXXqL!3)!dm zsRyBCA*~A+t=ki2Iw_V%uiC^+1o2gt)!gMc->OwB9rEz$d~0&4bS+46*hd#KiR6*xWC&%dS18milj&JC(77BWg4_gu zU~tbAdW}o$3qV7Oj?9-Uaa7pUZ#2I~gkSb^d7ry&BWf%=(_1yPN@t&tPFK^qS=9zh zEHpP_m(-=rB4#N?wS$7yR@GaVC7?HJp4(eSA6PHlA95f z$I=O*a~7@-bx(cXRUBnUK&OmCO-%A=?Njo+>%>0K?1(q7J)>ME%ju%+4`zRdO!5-T zWLvOzv&lRE?J^NX3RYL?!4~{r!dZgbt_S6TcvD=YXk_?v*Sf~t52@I>3jS)_nCYmy zS&7T#(w2!#=!{8fY(`5HU#9+KDrTOwxLl{TFo^}-LX&8skM#4)!kp4sG_iX#UR5vm z>2K5U7S-t68iep4^N#OrJS~gKH}AuEEOXc{}vu25$06Oylqi!`dQ(T)K<^otUyW0hhAl1w@e`jM}YqI;MJG9L4wd1g8^O`gqn691`I{8N>7 zt-=cI8YRg7e=BzFSFnR+|L^h!=jXBUZv{_j1r*=^SRlVZAGi~rz{kMff{%b#fulfc{Y|hM+>L+WHQ+3; z85F>iz<&n+3H%uUz(>G!;F(|y>;bz$1n$8B{0AQbw}3hDB=7+6Q~3nIP2ibe3Jih^z;^Iha1XwK z+rW!~d<2`pJ@^5>2yOwt4-SC!;L+eG_yoQQ?gBdF;0~aV*xb3s(r*QA#lgIfGdOl+p zFs-@Vb`DMq7Ut7nS6fzbHlH4`fG`XFnol0t&EigBP2D;EeJ#9(eVWU3WPECP@BWE{ zX)0F7GI+a)UeVi-+RoF0wp91|@&Pb}AojY70Pw-b9n zgbv;kbAwo-#AD+8AddS|3a13dLyI(Ip%dbInF$G#IV&?Uy22r^I@&W#R?5n|;xC6G zW2>!o!UQwxw+X}JHY?SS1@A^k!7463VDIuMtT8Gm<79<6xsqhmp;-lO8t7-yCTve& zYel^iYvh~LHkR}2W3%=C+y&-a#wH6jp+*DF#T-U@hicvL3P~u`hu}Xx8{NUJ5F_LH zOL#D(Njy89617m#X6u^l*Cr3TYn_`GT9)_@gKN)~mfTKVV|IxtIq^gWZtUucbgH&< z%{h6poHNofpX5-V>d>JE!8wF!y}+}6M`b!7#P({#Q`2Do>g28y} zEekIi+~|Sm>$f}Cg$xsU?V_u%b|P-6x7$7MCb|`oh%9XZ%(O#a>yI`@ zTY|S;Z99#u%gw1`sYL{{73AitB_;K1NMF|Vvc@KgoSk{F=O0|ADSEUy) zj_pEcxyR_@C9Yc4)vR7y9(CiPYjYD{>Hf`x9qO~9c`&JoI?G8+KR+;fJ!QrfPM2z-2@j-QX4^VC z35q29lwxk9r?vuUXQueJRjwi}nu-c>TCWt-@?5n>6nnX~%H+(F>M>SUz*{OG{&rhY zq*U%^BqSG%pCk9dli!-EwrWN1clLW^HZ;rCDeZ_sH{$Mmv%_tRNy)4Ms!3rn$mc$> zcl<&e0=xEQ)_QE6P$4e)f>l_zQj>eGNP416JOJSV$({*Q-7TPIz;^yrC~~&uV{8U& zi^`@zq!{qF+r0e=Q=051epupRt1_zw2} zSHTy-=fKCn?cgTR1PyQn_&I)npMVzu#TQJ2XMnT8H?aHv5quWB5Ii3!#$Xx@gPq{f zK=B3d1pf_O4-SG$!BfB*@B{n-?*MNHF9S~pXMio>U-1dN7c{^ja4FaZ6kBitct8Gt z_kmY{=Yq??CEx&94W10XkAL9rK-l9y&ENOo7q}UW0>u(+0*?SEf`3Z<172kbTb?$^ z($i=j^6cUyzR80Q3p`FM2yEp=fM1d`=W3$q+GtG&q%XFsDCltC24-S3k>83&zRxHb z-Z_EOescw5pIt{LPBAx^4E!Xv?zraX00R-_ffZwRYaW?~;Qp5d3uhP1O^P^?$1|S7 z3(YyV$W1(~ZR+iJ>)3^`f3V-BFlwdGWQ+*29`sv%*WPbk)?Lj&#`^G% z3$9C_aO~vkzb1j1!?%wgKEq^gZjX3=^xJE?8O_WTOyWR_%241-i=F+~o(Y=L;`j zG8|x!U>%uhN!zNMnYTKj64X(|}lZjoKvL!@XQy(%j z*M?j63BQjuRS6(PQ1#J&{Z8<{E^awbg4E7bLVJt`=1ufaeyME>gQ&#Fceg1#YfA4& zT_vBy2N!HEA9bBgx{SxJkDbL9jBimJVZq#Th7!TtRLn>wRK(p5X6ADCm9OH;4Xj{` zXLai?x_woST7p9qNlo~Kl(*xGbYmnjC@QwIp+_yk#L+TmU3IES@|3r{1D(`Mt>H$a z){tj8v4~`W%od3!u|PeWy`^@pq@s2ZM%-vjRf`L80n=6Ii&*V*?!zvmG5x?r3m#*~ zZqqhPw}|1wA-s~(6EeAir6id90d7eb2CVTgCG-!zXC-eta>)6g{kH9;N2b6;nr&Qn zt3M@v-TdSJ55J)BahL6R)O~Vpau)nLEY170o)@|s+ZLSVJT*=&r;fqH6E26E+AZ__ zxJe|LVQhggGrPbo!J+Q`ZOB4aB;1Z8S56443e)r&H(`>{$4R=rKx`&bEy>hk8naod znc2H05tJa~eG7Z{by+e$D_qo`a4d#vr4L&3Js#rAa$_oG73C#jiBPt=%`Pa@ZVc>> zIf0kvQCd#~~hfi}*A)rISK z`ofTliKojy5oq8Th6JSwyQtcnVvg_`k?Au`2BjTU<9IK(%;&?0J#D)a*Dr4^S<>nx zJ@eFU8{fWSL-cNDcF-f)gPcO6{TSD6>v{D}hk!Zitt6>8)j7=FwIGi&A$`ir-3&7= z9b~h#?)XKr-8gG|rD3T&d}Y!dB4RJ;ODS=zgWw!+68LBA`?rBNfSbV)a4v|z$Fc1{3T^|pfZzkr zIRN9}OmHVQ{%65E!E=H38jOQ&;Bnv-pfmm65B>-|4_pPF4K4=#;IZK0;JeuR9|j)+ zuLO%=9E^cez=_}$*#0+xMX(J#47?JXe+$?E27vnYKJNp7V(p){!Du@Y-E3>ydNU_r zdC8kd9kH7Dl7msQqXM2@hT&u^*hyh9pbmHn2kNk1AamXPfZI12K}?lpgnL5SkzwI-RzFZQgK*pPTWSQP3%GBs1a?RvZL6T{-lSg zFkfz;9${>i>t~vKGuRu-v_lyzz?x4&V|Cd>gHmyYG8v*PUz_(_c9NL6WByE5T7sNf zOd>N`R8jx9(ZNqxJM3V(!1{h?Ku8x9v*qezbV{|pu&oZ(NN17do>#(-Dw(6KZ+}KN zOI4Y@abqqHRc8!{NG$iVm*2L`(uzLi?lKCrR!3T*a6$cOsxJg;kFd#Z2yg^Pog5;e zjS=}Nn4erZ&YbmTGKQ*Iw>ln81-fZ0y{0m|*uR0vtnzJgLqkHHXG?vw%eKd^iuR9< zES`O4v}#f2R2oOQq|^{9$?&RJ7bizr#T9DF7+LR`+?`66-nP*LH_&=5Y$9&Mi3Gtc zcG@mtGY$R|!@3A5HD8fPo?Z;htfiy=5kVXi1y*Wnx)RBToTHY%fVKu3bm`<|G?BA< zR<0AB3Kw(_w~*;r#-yP!k2zkQ1UPO#(z5K*-fY@ak z7;C{pevQRgK+m_|IL;_=0UsGdpi&N_LKi8;Sz*iey`usXiD>SF&jZ6|qR+(^j(Do5 z)|Ifo{YeU55VXZ37>UA|_77}vbAg))) zaAG@j0JUx(^_FkA&#R?Fy$1C-Zv4VEaO%4$ zxS@DSjc!%Yb)&Bv^vEv*C&rvoEaB%OZLA$`>_>e|KDi8`uhGScKv&S&i;EP zh`~W{4mbt;7#m+_0DcvG2)qW|0A2(x1KJ;O23QY%hTVTR_&)eY@CNXD@LKR1p#1`) zU>$f6cp&%*w*S509`HhNDG2_7hx5050xtnKfoX6q_%gQs2Z7G|e-=0Z9uFP`v_C-S z|9usF8R&ffw*bWgTnnBJfnv3~mAM051U7 zf#(68S+E;C4*VG2_Q?10XXcO-!h$6B4S zf!az-$eUV3xElPCS{4&*YAJJ;TUYcq*R6}T%a}?pXWNkIcyL*kZ5wGgPhBsVT0dBRM(NIQgR{=%q{64 z*vhfo@?)YlBz9FxW++wW7;eo(j=Ne%GaMBz$)#hLMPY2_tgfHcQLNoNy>=>@-+2#M zAd_Vube(W&pQ`7aw^pLuw-}F&uDV#ZOe1$zBQr;yE?8dSY;JdexF0S|B(6wk&2i$n zgwLv#G1}d2sKpw2X7@lhrLuiicU(!UvLYZ$wYG7q&R3Cn+gG_dx#F}#v5Bvllt#G3 zFDRwgWrT`*sJDBmgHh&8T9~FW+6G6w<)@4jqA&HQ2Wr`axvUaeQ>JywkQKb7Pn=)= zEU)F-jBIH_d@zM8$r4|_>lxP>NN`}m@m-&NrhT7fet_)tEF=PO#0oa&fK zkl8dx3Yq)(1SW!0ZZF4#5|L!fm-(Fu6X~oOHWOIyxhcvswfE4Gv1^dO?2aD2f0_i1 zYU>?x%?&SH?rxx4y6jY*tT$#ymMG=(%%&yR>`$ff0cDo0k6ZR%m=KtC1-D_`@(XU; z&U7a;RUxAfbrcCw-pI{qeKuaSZpwpFlR}8&(wCaR8TPglUpaAAwke#oZs*R-vd-iF z^8BDNYlGGg2{%%yE26Tom`rm{I!_*$8k-#Dg#TTGlY4ZqBx^P>j7@=~3aD|X`HTP8 z%a`F|?6ZV-1FF6E`b{&;On%)D$O+YTV>UQ;DNrv3us1bL7hAl6Mgrf5o|ibWUN3zK zUR!=i_w$ZCKlqhq-8v2{*4~HvwA#{*=S8)ZVt`*l!AM-w}L7Pw@^1ci7I7t-#z)G+P{$B82q# zr8D7Tl}Yo;!Wv$kw7^v2Ckj_n+xqP999j@9*7#!cvuURstlH5Ld$|M(s_UxIaX5}# zU?_j+;f<~q(m;8x+Cp=sZ3+2AOHaZuy>#R>*M>xpyVtg`vw4B_72Sgb&K3)CC$j%B zP5Zv+*Z-^D{{J{O{vAN?08s4z;C|51b+a|0rx_PJHfkw-p|(r zhrpx2kFdo*2Hpp52G;=D>}$ajz`tW}-wnPG?f|mYwZ?x0On?Uit?7RW90qkT1D*+V zF2D#l3w#lKdk$OzE&>;VGr?!DvEK)DX5gQKSAds;X`uCfoeiKq{6f0cZ(}s$+%#1& zbq+027PX@9*MebJY5E(@v4ly*j|I;-o-3tYA-MP-mg*?ml|sTjqwD#Ha;vXP**KBX ziK^lxllKRzOdI&2ePPF>cV26#bXhl7o7~F&GM+)ppBaW?@&S~b!D+|eKmmz6`-xA$ zD>tsPsoTT+fO?{^rJt9F@PVlAnFV`Q$8@ao!I54F@R=F`f!c70rClyzs_>c*AA*1v zVtC)K4epI7>`73d>fs`ju{ocI2QM5P8P##g!Z!aZJHCQzT5J6dTB%WU79B6lmMxE z=UwWaF4Fo;*e)eZm_ovY))wt*y~LjNR+)Fv@E$QbWIE zQ-d5gjp<;$3@^bnXnGx7{Z<{l^so}Q;uRMw0+~WHMrC!9vYsx~i5f^EFZBZ}9|t|# zBaCXA&Qi)|j)jC5UAzH$CVw#G^2dDYK@FrK652|-8n#5%hsir-QWch+53=S)yCXZJ zv6vUgwzJCmOhk&85GbKiNc1{0(a!pA$;$z~^<*Tn{j#ooMj^ev$DLg2CSk7*w8!2t zp4=3w#u@>BHDqdQsMSuJcU=vg58n|j4Uck)H%(stRfN9Pu37sh$9H4+@ctoAHc%T- zF7)Mon1~CjR7#ULoYdL0-L8fBY-e@w1|)0^y$;Ofqn0EKvZKlS60!%NST4>S(y=F) zM7aY{Y(@FX_!#YT765;Hd%|j(IRxgHx2e>JjKOMC)>N>%;9_^^RIs#syio|$BsL>W zYyu~YW9iAFgQ6c;Z*l?jc0pbty;q6Z08V52swdjBdchG-x9Rc?+_k=FL4m4v?LY$U zT}kPIW$F$-4IwpdcUf3Vqbjfixy2PRbK1AshB{jBEu==5 zZQgS370}AgNX>FEEgymi$A90X=gr_P;2ANRW7UyPv9GDXQI#p%rOhh$D%mxW#w5Ar z)cY>VeP&n~9kR1Xa@w%Om#<)Y0#G`hOwv-Sdp6sW_S{jfI}C)noP`6TZqhjk`!ikO zLC}!dzeu=k=ah-dVvTZlGJ>P#^uzV8q&6hlqv5TqVFK{ANta792^kzp3cW)OG-$ij zz-X5oXhbtrQ6AL@aD!zfT%G6VZk(Lq;lRDiac9attbU6MNdS?v*;Pe&r^{kDTGyCe4s{6rc_9-RSWX}FT0>0dm)jZ778ffV_S>A_wOHlYk6N{0hucWHOidizKRmT#WOQ)s zAVFQ`^dE?J503AiVxx;RVCysH$d-9)+>U+D9&o0%cQ?5yGpVGUNK#u8>jTXuth%6! zdNm1#%$$nV@g$RE=3>RD(|IP|o&KwJN78dn%H5w6%`~sc{@0mvI?rC~{|CMO|0V4F z8^AT-DzF!v4desRy8lPPtHE=?ESLbJ;2iJ}@I`F?cLALXsQ3M8o&P5A$KWz>KG+J@ zfIG1LZw2E(@&D_;V^nb>L)h68I8!zRm`GHaG?@2CKkn;BIVv zt^2;2eyLqz-izF@L=#MZ2#wj>p&5VfG2|poCR-2U*}-=^bRzJIF$3X2@to!O^=5O-EtnrFW2pU-E=Ji51^F$U;kYA*ez& zfp^M8`lQYlnUQkJuk%3`!GByb-5DnaBDLx5 z|I4wHAAub$`+v1p!+(sOe=GPa@E&kI*a02{{ssG9=lxBCVQ>K`fV;5eW$P=p|ApZB z;5wjp0-gm<0C!^79|nuyEO09L7s~zw_)~BQTnZG=-w&PubjF|6Z-pj8FIkTQJqq+F z(4#<)0zC@!DA1!oj{+;9fZQ9-d7OUt-l62et9$QIO4fCQQ)-EiD;r7Y_kLzqi)~}S z?c-zTw@aHWHn$Y!({><-xy&%p@PYN-p=2xi8lWM%D-$$&KVBLUXsVprB173$6k^Ws tJ>I`JM|7;#iryWOt&qZ*oD*~fua8JYl<^MceD*y`a;0{EOM9m6{{g%nAGQDh literal 0 HcmV?d00001 diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index a688e9981e..ded53ab4ea 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,7 +39,6 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; - pRpc->mfp = pInit->mfp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; @@ -119,7 +118,12 @@ void rpcCancelRequest(int64_t rid) { return; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRequest(shandle, ip, port, pMsg); + transSendRequest(shandle, ip, port, pMsg, NULL); +} +void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; + transSendRequest(shandle, ip, port, pMsg, pCtx); } void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); @@ -140,6 +144,10 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } +void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { + // + rpcSendResponse(msg); +} void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7100c34845..18a0611b75 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -30,6 +30,7 @@ typedef struct SCliConn { uint64_t expireTime; int hThrdIdx; bool broken; // link broken or not + STransCtx ctx; ConnStatus status; // int release; // 1: release @@ -207,7 +208,7 @@ void cliHandleResp(SCliConn* conn) { STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; + transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -283,7 +284,7 @@ void cliHandleExcept(SCliConn* pConn) { transMsg.ahandle = NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { - transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; + transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -374,6 +375,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { static void addConnToPool(void* pool, SCliConn* conn) { char key[128] = {0}; + transCtxDestroy(&conn->ctx); tstrncpy(key, conn->ip, strlen(conn->ip)); tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); @@ -436,7 +438,6 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { conn->writeReq.data = conn; conn->connReq.data = conn; conn->cliMsgs = taosArrayInit(2, sizeof(void*)); - QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -446,6 +447,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { } static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); + QUEUE_REMOVE(&conn->conn); if (clear) { uv_close((uv_handle_t*)conn->stream, cliDestroy); @@ -455,6 +457,7 @@ static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->ip); free(conn->stream); + transCtxDestroy(&conn->ctx); taosArrayDestroy(conn->cliMsgs); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); @@ -630,10 +633,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (conn != NULL) { conn->hThrdIdx = pCtx->hThrdIdx; + transCtxMerge(&conn->ctx, &pCtx->appCtx); if (taosArrayGetSize(conn->cliMsgs) > 0) { taosArrayPush(conn->cliMsgs, &pMsg); return; } + taosArrayPush(conn->cliMsgs, &pMsg); transDestroyBuffer(&conn->readBuf); cliSend(conn); @@ -825,7 +830,7 @@ void transReleaseCliHandle(void* handle) { transSendAsync(thrd->asyncPool, &cmsg->q); } -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle); if (index == -1) { @@ -835,13 +840,14 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } - tDebug("send request at thread:%d %p", index, pMsg); + tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; pCtx->hThrdIdx = index; + pCtx->appCtx = *ctx; assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not @@ -855,6 +861,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } + void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 7123593a33..2c90efc3aa 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -155,9 +155,9 @@ bool transReadComplete(SConnBuffer* connBuf) { } return false; } -int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {return 0;} +int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; } -int transUnpackMsg(STransMsgHead* msgHead) {return 0;} +int transUnpackMsg(STransMsgHead* msgHead) { return 0; } int transDestroyBuffer(SConnBuffer* buf) { if (buf->cap > 0) { tfree(buf->buf); @@ -224,4 +224,56 @@ int transSendAsync(SAsyncPool* pool, queue* q) { return uv_async_send(async); } +void transCtxInit(STransCtx* ctx) { + // init transCtx + ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK); +} +void transCtxDestroy(STransCtx* ctx) { + if (ctx->args == NULL) { + return; + } + + STransCtxVal* iter = taosHashIterate(ctx->args, NULL); + while (iter) { + iter->free(iter->val); + iter = taosHashIterate(ctx->args, iter); + } + taosHashCleanup(ctx->args); +} + +void transCtxMerge(STransCtx* dst, STransCtx* src) { + if (dst->args == NULL) { + dst->args = src->args; + src->args = NULL; + return; + } + void* key = NULL; + size_t klen = 0; + void* iter = taosHashIterate(src->args, NULL); + while (iter) { + STransCtxVal* sVal = (STransCtxVal*)iter; + key = taosHashGetKey(sVal, &klen); + + STransCtxVal* dVal = taosHashGet(dst->args, key, klen); + if (dVal) { + dVal->free(dVal->val); + } + taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal)); + iter = taosHashIterate(src->args, iter); + } + taosHashCleanup(src->args); +} +void* transCtxDumpVal(STransCtx* ctx, int32_t key) { + if (ctx->args == NULL) { + return NULL; + } + STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key)); + if (cVal == NULL) { + return NULL; + } + char* ret = calloc(1, cVal->len); + memcpy(ret, (char*)cVal->val, cVal->len); + return (void*)ret; +} + #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 321a3489b7..6be664233b 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -403,16 +403,16 @@ static void uvStartSendResp(SSrvMsg* smsg) { return; } -static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { - STrans* pTransInst = conn->pTransInst; - if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { - STransMsg transMsg = {0}; - transMsg.msgType = conn->inType; - transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - // transRefSrvHandle(conn); - (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); - } -} +// static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { +// STrans* pTransInst = conn->pTransInst; +// if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { +// STransMsg transMsg = {0}; +// transMsg.msgType = conn->inType; +// transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; +// // transRefSrvHandle(conn); +// (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); +// } +//} static void destroySmsg(SSrvMsg* smsg) { if (smsg == NULL) { return; diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 31015359f4..deccd633d8 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -86,18 +86,8 @@ class Client { rpcClose(this->transCli); this->transCli = NULL; } - void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { - rpcClose(this->transCli); - this->transCli = rpcOpen(&rpcInit_); - } void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); - rpcInit_.mfp = mfp; - this->transCli = rpcOpen(&rpcInit_); - } - void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { - rpcClose(this->transCli); - rpcInit_.mfp = mfp; this->transCli = rpcOpen(&rpcInit_); } @@ -156,10 +146,6 @@ class Server { rpcClose(this->transSrv); this->transSrv = NULL; } - void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { - this->Stop(); - this->Start(); - } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { this->Stop(); rpcInit_.cfp = cfp; @@ -252,23 +238,11 @@ class TransObj { // srv->Stop(); } - void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) { - // do nothing - cli->SetPersistFP(pfp); - } void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) { // do nothing cli->SetConstructFP(mfp); } - void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { - // do nothing - cli->SetPAndMFp(pfp, mfp); - } // call when link broken, and notify query or fetch stop - void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { - //////// - srv->SetExceptFp(efp); - } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { /////// srv->SetSrvContinueSend(cfp); @@ -375,22 +349,15 @@ TEST_F(TransEnv, cliReleaseHandle) { req.pCont = rpcMallocCont(10); req.contLen = 10; tr->cliSendAndRecvNoHandle(&req, &resp); - // if (i == 5) { - // std::cout << "stop server" << std::endl; - // tr->StopSrv(); - //} - // if (i >= 6) { EXPECT_TRUE(resp.code == 0); //} } ////////////////// } TEST_F(TransEnv, cliReleaseHandleExcept) { - // tr->SetCliPersistFp(cliPersistHandle); - SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -459,7 +426,7 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) { // conn broken } TEST_F(TransEnv, queryExcept) { - tr->SetSrvExceptFp(handleExcept); + // tr->SetSrvExceptFp(handleExcept); // query and conn is broken } diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 53910aa30c..1f8c8e8ff2 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -136,4 +136,98 @@ TEST_F(QueueEnv, testIter) { assert(result.size() == vals.size()); } +class TransCtxEnv : public ::testing::Test { + protected: + virtual void SetUp() { + ctx = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(ctx); + // TODO + } + virtual void TearDown() { + transCtxDestroy(ctx); + // formate + } + STransCtx *ctx; +}; + +TEST_F(TransCtxEnv, mergeTest) { + int key = 1; + { + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + EXPECT_EQ(2, taosHashGetSize(ctx->args)); + { + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + std::string val("Hello"); + EXPECT_EQ(4, taosHashGetSize(ctx->args)); + { + key = 1; + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = calloc(1, 11); + memcpy(val1.val, val.c_str(), val.size()); + val1.len = 11; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = calloc(1, 11); + memcpy(val1.val, val.c_str(), val.size()); + val1.len = 11; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + EXPECT_EQ(4, taosHashGetSize(ctx->args)); + + char *skey = (char *)transCtxDumpVal(ctx, 1); + EXPECT_EQ(0, strcmp(skey, val.c_str())); + free(skey); + + skey = (char *)transCtxDumpVal(ctx, 2); + EXPECT_EQ(0, strcmp(skey, val.c_str())); +} #endif