diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 60faba3e3a..9e4c9cc071 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -528,9 +528,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { return TSDB_CODE_SUCCESS; } - int32_t index = pWrapper->sourceIndex; - int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index); - *pRpcHandle = -1; + int32_t index = pWrapper->sourceIndex; + // int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index); SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); if (!pSourceDataInfo) { return terrno; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 206253f9c2..500f75d4dd 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -607,6 +607,8 @@ void* destroyConnPool(SCliThrd* pThrd) { SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } + SMsgList* msglist = connList->list; + taosMemoryFree(msglist); connList = taosHashIterate((SHashObj*)pool, connList); } @@ -1534,17 +1536,22 @@ static void doFreeTimeoutMsg(void* param) { } int32_t clConnMayUpdateReqCtx(SCliConn* pConn, SCliReq* pReq) { - int32_t code = 0; - int64_t qid = pReq->msg.info.qId; - SReqCtx* pCtx = pReq->ctx; + int32_t code = 0; + int64_t qid = pReq->msg.info.qId; + SReqCtx* pCtx = pReq->ctx; + SCliThrd* pThrd = pConn->hostThrd; + if (pCtx == NULL) { + tDebug("%s conn %p not need to update statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid); + return 0; + } STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); if (pUserCtx == NULL) { code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx)); - tDebug("succ to add conn %p of statue ctx, qid:%ld", pConn, qid); + tDebug("%s conn %p add conn %p of statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid); } else { transCtxMerge(pUserCtx, &pCtx->userCtx); - tDebug("succ to update conn %p of statue ctx, qid:%ld", pConn, qid); + tDebug("%s conn %s update conn %p of statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid); } return 0; } @@ -1576,11 +1583,6 @@ int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) { if (qid == 0) { return TSDB_CODE_RPC_NO_STATE; } - SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid)); - if (pState != 0) { - tDebug("succ to get conn %p of statue, qid:%ld", pConn, qid); - ASSERT(0); - } SReqState state = {.conn = pConn, .arg = NULL}; code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state)); @@ -1915,6 +1917,9 @@ _err: } int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { SCliThrd* pThrd = thrd; + if (pReq->ctx == NULL) { + return 0; + } return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); } int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { @@ -3069,12 +3074,14 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) { } pCli->type = Normal; - tDebug("%s release conn id %" PRId64 "", pInst->label, transpointId); - STransMsg msg = {.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = (void*)transpointId}; + TRACE_SET_MSGID(&msg.info.traceId, tGenIdPI64()); msg.info.qId = transpointId; pCli->msg = msg; + STraceId* trace = &pCli->msg.info.traceId; + tGDebug("%s start to free conn qid:%ld", pInst->label, transpointId); + code = transAsyncSend(pThrd->asyncPool, &pCli->q); if (code != 0) { taosMemoryFree(pCli);