diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d8dcf72bed..aae0c6bd22 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -70,12 +70,19 @@ typedef struct SRpcInit { // call back to retrieve the client auth info, for server app only int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); - // to support Send messages multiple times on a link - void *(*mfp)(void *parent, tmsg_t msgType); - void *parent; } SRpcInit; +typedef struct { + void * val; + int32_t len; + void (*free)(void *arg); +} SRpcCtxVal; + +typedef struct { + SHashObj *args; +} SRpcCtx; + int32_t rpcInit(); void rpcCleanup(); void * rpcOpen(const SRpcInit *pRpc); @@ -84,16 +91,17 @@ void * rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void * rpcReallocCont(void *ptr, int contLen); void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendResponse(const SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(int64_t rid); +void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +void rpcSendResponse(const SRpcMsg *pMsg); +void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcReportProgress(void *pConn, char *pCont, int contLen); +void rpcCancelRequest(int64_t rid); +void rpcRegisterBrokenLinkArg(SRpcMsg *msg); // just release client conn to rpc instance, no close sock -void rpcReleaseHandle(void *handle, int8_t type); - +void rpcReleaseHandle(void *handle, int8_t type); // void rpcRefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a60531a429..a939bbd644 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -14,6 +14,10 @@ */ #ifdef USE_UV +#ifdef __cplusplus +extern "C" { +#endif + #include #include "lz4.h" #include "os.h" @@ -121,24 +125,21 @@ typedef struct { } SRpcReqContext; typedef SRpcMsg STransMsg; +typedef SRpcCtx STransCtx; +typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; typedef struct { - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - tmsg_t msgType; // message type - uint8_t* pCont; // content provided by app - int32_t contLen; // content length - // int32_t code; // error code - // int16_t numOfTry; // number of try for different servers - // int8_t oldInUse; // server EP inUse passed by app - // int8_t redirect; // flag to indicate redirect + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + tmsg_t msgType; // message type int8_t connType; // connection type cli/srv int64_t rid; // refId returned by taosAddRef - STransMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API + STransCtx appCtx; // + STransMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API int hThrdIdx; char* ip; @@ -181,7 +182,7 @@ typedef struct { #pragma pack(pop) typedef enum { Normal, Quit, Release } STransMsgType; -typedef enum { ConnNormal, ConnAcquire, ConnRelease } ConnStatus; +typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) @@ -259,7 +260,7 @@ void transUnrefCliHandle(void* handle); void transReleaseCliHandle(void* handle); void transReleaseSrvHandle(void* handle); -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); void transSendResponse(const STransMsg* pMsg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); @@ -270,4 +271,14 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void transCloseClient(void* arg); void transCloseServer(void* arg); +void transCtxInit(STransCtx* ctx); +void transCtxDestroy(STransCtx* ctx); +void transCtxClear(STransCtx* ctx); +void transCtxMerge(STransCtx* dst, STransCtx* src); +void* transCtxDumpVal(STransCtx* ctx, int32_t key); + +#ifdef __cplusplus +} +#endif + #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index e739380467..1395408960 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,9 +63,6 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); - bool (*pfp)(void* parent, tmsg_t msgType); - void* (*mfp)(void* parent, tmsg_t msgType); - bool (*efp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/.transCli.c.swn b/source/libs/transport/src/.transCli.c.swn new file mode 100644 index 0000000000..583fbc9f74 Binary files /dev/null and b/source/libs/transport/src/.transCli.c.swn differ diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index a688e9981e..ded53ab4ea 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,7 +39,6 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; - pRpc->mfp = pInit->mfp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; @@ -119,7 +118,12 @@ void rpcCancelRequest(int64_t rid) { return; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRequest(shandle, ip, port, pMsg); + transSendRequest(shandle, ip, port, pMsg, NULL); +} +void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; + transSendRequest(shandle, ip, port, pMsg, pCtx); } void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); @@ -140,6 +144,10 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } +void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { + // + rpcSendResponse(msg); +} void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7100c34845..18a0611b75 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -30,6 +30,7 @@ typedef struct SCliConn { uint64_t expireTime; int hThrdIdx; bool broken; // link broken or not + STransCtx ctx; ConnStatus status; // int release; // 1: release @@ -207,7 +208,7 @@ void cliHandleResp(SCliConn* conn) { STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; + transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -283,7 +284,7 @@ void cliHandleExcept(SCliConn* pConn) { transMsg.ahandle = NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { - transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; + transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -374,6 +375,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { static void addConnToPool(void* pool, SCliConn* conn) { char key[128] = {0}; + transCtxDestroy(&conn->ctx); tstrncpy(key, conn->ip, strlen(conn->ip)); tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); @@ -436,7 +438,6 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { conn->writeReq.data = conn; conn->connReq.data = conn; conn->cliMsgs = taosArrayInit(2, sizeof(void*)); - QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -446,6 +447,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { } static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); + QUEUE_REMOVE(&conn->conn); if (clear) { uv_close((uv_handle_t*)conn->stream, cliDestroy); @@ -455,6 +457,7 @@ static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->ip); free(conn->stream); + transCtxDestroy(&conn->ctx); taosArrayDestroy(conn->cliMsgs); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); @@ -630,10 +633,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (conn != NULL) { conn->hThrdIdx = pCtx->hThrdIdx; + transCtxMerge(&conn->ctx, &pCtx->appCtx); if (taosArrayGetSize(conn->cliMsgs) > 0) { taosArrayPush(conn->cliMsgs, &pMsg); return; } + taosArrayPush(conn->cliMsgs, &pMsg); transDestroyBuffer(&conn->readBuf); cliSend(conn); @@ -825,7 +830,7 @@ void transReleaseCliHandle(void* handle) { transSendAsync(thrd->asyncPool, &cmsg->q); } -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle); if (index == -1) { @@ -835,13 +840,14 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } - tDebug("send request at thread:%d %p", index, pMsg); + tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; pCtx->hThrdIdx = index; + pCtx->appCtx = *ctx; assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not @@ -855,6 +861,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } + void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 7123593a33..2c90efc3aa 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -155,9 +155,9 @@ bool transReadComplete(SConnBuffer* connBuf) { } return false; } -int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {return 0;} +int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; } -int transUnpackMsg(STransMsgHead* msgHead) {return 0;} +int transUnpackMsg(STransMsgHead* msgHead) { return 0; } int transDestroyBuffer(SConnBuffer* buf) { if (buf->cap > 0) { tfree(buf->buf); @@ -224,4 +224,56 @@ int transSendAsync(SAsyncPool* pool, queue* q) { return uv_async_send(async); } +void transCtxInit(STransCtx* ctx) { + // init transCtx + ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK); +} +void transCtxDestroy(STransCtx* ctx) { + if (ctx->args == NULL) { + return; + } + + STransCtxVal* iter = taosHashIterate(ctx->args, NULL); + while (iter) { + iter->free(iter->val); + iter = taosHashIterate(ctx->args, iter); + } + taosHashCleanup(ctx->args); +} + +void transCtxMerge(STransCtx* dst, STransCtx* src) { + if (dst->args == NULL) { + dst->args = src->args; + src->args = NULL; + return; + } + void* key = NULL; + size_t klen = 0; + void* iter = taosHashIterate(src->args, NULL); + while (iter) { + STransCtxVal* sVal = (STransCtxVal*)iter; + key = taosHashGetKey(sVal, &klen); + + STransCtxVal* dVal = taosHashGet(dst->args, key, klen); + if (dVal) { + dVal->free(dVal->val); + } + taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal)); + iter = taosHashIterate(src->args, iter); + } + taosHashCleanup(src->args); +} +void* transCtxDumpVal(STransCtx* ctx, int32_t key) { + if (ctx->args == NULL) { + return NULL; + } + STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key)); + if (cVal == NULL) { + return NULL; + } + char* ret = calloc(1, cVal->len); + memcpy(ret, (char*)cVal->val, cVal->len); + return (void*)ret; +} + #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 321a3489b7..6be664233b 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -403,16 +403,16 @@ static void uvStartSendResp(SSrvMsg* smsg) { return; } -static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { - STrans* pTransInst = conn->pTransInst; - if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { - STransMsg transMsg = {0}; - transMsg.msgType = conn->inType; - transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - // transRefSrvHandle(conn); - (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); - } -} +// static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { +// STrans* pTransInst = conn->pTransInst; +// if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { +// STransMsg transMsg = {0}; +// transMsg.msgType = conn->inType; +// transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; +// // transRefSrvHandle(conn); +// (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); +// } +//} static void destroySmsg(SSrvMsg* smsg) { if (smsg == NULL) { return; diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 31015359f4..deccd633d8 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -86,18 +86,8 @@ class Client { rpcClose(this->transCli); this->transCli = NULL; } - void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { - rpcClose(this->transCli); - this->transCli = rpcOpen(&rpcInit_); - } void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); - rpcInit_.mfp = mfp; - this->transCli = rpcOpen(&rpcInit_); - } - void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { - rpcClose(this->transCli); - rpcInit_.mfp = mfp; this->transCli = rpcOpen(&rpcInit_); } @@ -156,10 +146,6 @@ class Server { rpcClose(this->transSrv); this->transSrv = NULL; } - void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { - this->Stop(); - this->Start(); - } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { this->Stop(); rpcInit_.cfp = cfp; @@ -252,23 +238,11 @@ class TransObj { // srv->Stop(); } - void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) { - // do nothing - cli->SetPersistFP(pfp); - } void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) { // do nothing cli->SetConstructFP(mfp); } - void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { - // do nothing - cli->SetPAndMFp(pfp, mfp); - } // call when link broken, and notify query or fetch stop - void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { - //////// - srv->SetExceptFp(efp); - } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { /////// srv->SetSrvContinueSend(cfp); @@ -375,22 +349,15 @@ TEST_F(TransEnv, cliReleaseHandle) { req.pCont = rpcMallocCont(10); req.contLen = 10; tr->cliSendAndRecvNoHandle(&req, &resp); - // if (i == 5) { - // std::cout << "stop server" << std::endl; - // tr->StopSrv(); - //} - // if (i >= 6) { EXPECT_TRUE(resp.code == 0); //} } ////////////////// } TEST_F(TransEnv, cliReleaseHandleExcept) { - // tr->SetCliPersistFp(cliPersistHandle); - SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -459,7 +426,7 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) { // conn broken } TEST_F(TransEnv, queryExcept) { - tr->SetSrvExceptFp(handleExcept); + // tr->SetSrvExceptFp(handleExcept); // query and conn is broken } diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 53910aa30c..1f8c8e8ff2 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -136,4 +136,98 @@ TEST_F(QueueEnv, testIter) { assert(result.size() == vals.size()); } +class TransCtxEnv : public ::testing::Test { + protected: + virtual void SetUp() { + ctx = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(ctx); + // TODO + } + virtual void TearDown() { + transCtxDestroy(ctx); + // formate + } + STransCtx *ctx; +}; + +TEST_F(TransCtxEnv, mergeTest) { + int key = 1; + { + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + EXPECT_EQ(2, taosHashGetSize(ctx->args)); + { + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + std::string val("Hello"); + EXPECT_EQ(4, taosHashGetSize(ctx->args)); + { + key = 1; + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = calloc(1, 11); + memcpy(val1.val, val.c_str(), val.size()); + val1.len = 11; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = calloc(1, 11); + memcpy(val1.val, val.c_str(), val.size()); + val1.len = 11; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + EXPECT_EQ(4, taosHashGetSize(ctx->args)); + + char *skey = (char *)transCtxDumpVal(ctx, 1); + EXPECT_EQ(0, strcmp(skey, val.c_str())); + free(skey); + + skey = (char *)transCtxDumpVal(ctx, 2); + EXPECT_EQ(0, strcmp(skey, val.c_str())); +} #endif