diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 02029a996c..c2cce3a05d 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -52,8 +52,8 @@ typedef struct { char user[TSDB_USER_LEN]; SRpcMsg rpcMsg; int32_t rspLen; - void *pRsp; - void *pNode; + void * pRsp; + void * pNode; } SNodeMsg; typedef struct SRpcInit { @@ -87,7 +87,15 @@ typedef struct { } SRpcCtxVal; typedef struct { - SHashObj *args; + int32_t msgType; + void * val; + int32_t len; + void (*free)(void *arg); +} SRpcBrokenlinkVal; + +typedef struct { + SHashObj * args; + SRpcBrokenlinkVal brokenVal; } SRpcCtx; int32_t rpcInit(); diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index ed1ad7a374..c7f964f2ba 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -154,15 +154,7 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) { // add more search type } - char* dst = NULL; - if (data != NULL) { - char* src = (char*)data; - size_t len = strlen(src); - dst = (char*)calloc(1, len * sizeof(char) + 1); - memcpy(dst, src, len); - } - - ctx->data = dst; + ctx->data = strdup((char*)data); ctx->type = atype; ctx->stdata = (void*)sv; return ctx; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 8cfde8267d..32a0cf0d54 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -277,6 +277,7 @@ void transCtxCleanup(STransCtx* ctx); void transCtxClear(STransCtx* ctx); void transCtxMerge(STransCtx* dst, STransCtx* src); void* transCtxDumpVal(STransCtx* ctx, int32_t key); +void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); // queue sending msgs typedef struct { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c0ee9b9ca5..ea73e07c80 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -210,6 +210,9 @@ void cliHandleResp(SCliConn* conn) { STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); + if (transMsg.ahandle == NULL) { + transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -282,6 +285,9 @@ void cliHandleExcept(SCliConn* pConn) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); + if (transMsg.ahandle == NULL) { + transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); + } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 209475ca05..5684c332c0 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -238,12 +238,14 @@ void transCtxCleanup(STransCtx* ctx) { iter->free(iter->val); iter = taosHashIterate(ctx->args, iter); } + taosHashCleanup(ctx->args); } void transCtxMerge(STransCtx* dst, STransCtx* src) { if (dst->args == NULL) { dst->args = src->args; + dst->brokenVal = src->brokenVal; src->args = NULL; return; } @@ -275,6 +277,14 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) { memcpy(ret, (char*)cVal->val, cVal->len); return (void*)ret; } +void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) { + char* ret = calloc(1, ctx->brokenVal.len); + + memcpy(ret, (char*)(ctx->brokenVal.val), ctx->brokenVal.len); + *msgType = ctx->brokenVal.msgType; + + return (void*)ret; +} void transQueueInit(STransQueue* queue, void (*free)(void* arg)) { queue->q = taosArrayInit(2, sizeof(void*)); diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 0b1b1834df..89cb66e9cf 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -364,9 +364,12 @@ TEST_F(TransEnv, srvReleaseHandle) { SRpcMsg resp = {0}; tr->SetSrvContinueSend(processReleaseHandleCb); // tr->Restart(processReleaseHandleCb); - void *handle = NULL; + void * handle = NULL; + SRpcMsg req = {0}; for (int i = 0; i < 1; i++) { - SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; + req.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -378,8 +381,11 @@ TEST_F(TransEnv, srvReleaseHandle) { } TEST_F(TransEnv, cliReleaseHandleExcept) { SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 3; i++) { - SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; + req.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -396,8 +402,10 @@ TEST_F(TransEnv, cliReleaseHandleExcept) { } TEST_F(TransEnv, srvContinueSend) { tr->SetSrvContinueSend(processContinueSend); + SRpcMsg req = {0}, resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {0}, resp = {0}; + memset(&req, 0, sizeof(req)); + memset(&resp, 0, sizeof(resp)); req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -410,8 +418,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { - SRpcMsg req = {.handle = resp.handle}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -428,8 +438,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) { TEST_F(TransEnv, cliPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { - SRpcMsg req = {.handle = resp.handle}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -450,8 +462,11 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) { TEST_F(TransEnv, queryExcept) { tr->SetSrvContinueSend(processRegisterFailure); SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { - SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; + req.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -466,8 +481,10 @@ TEST_F(TransEnv, queryExcept) { } TEST_F(TransEnv, noResp) { SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { - SRpcMsg req = {.noResp = 1}; + memset(&req, 0, sizeof(req)); + req.noResp = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10;