opt parameter

This commit is contained in:
yihaoDeng 2024-09-13 15:04:04 +08:00
parent d3634f8999
commit 2c60ee27cc
2 changed files with 21 additions and 15 deletions

View File

@ -528,9 +528,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t index = pWrapper->sourceIndex; int32_t index = pWrapper->sourceIndex;
int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index); // int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
*pRpcHandle = -1;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
if (!pSourceDataInfo) { if (!pSourceDataInfo) {
return terrno; return terrno;

View File

@ -607,6 +607,8 @@ void* destroyConnPool(SCliThrd* pThrd) {
SCliConn* c = QUEUE_DATA(h, SCliConn, q); SCliConn* c = QUEUE_DATA(h, SCliConn, q);
cliDestroyConn(c, true); cliDestroyConn(c, true);
} }
SMsgList* msglist = connList->list;
taosMemoryFree(msglist);
connList = taosHashIterate((SHashObj*)pool, connList); connList = taosHashIterate((SHashObj*)pool, connList);
} }
@ -1534,17 +1536,22 @@ static void doFreeTimeoutMsg(void* param) {
} }
int32_t clConnMayUpdateReqCtx(SCliConn* pConn, SCliReq* pReq) { int32_t clConnMayUpdateReqCtx(SCliConn* pConn, SCliReq* pReq) {
int32_t code = 0; int32_t code = 0;
int64_t qid = pReq->msg.info.qId; int64_t qid = pReq->msg.info.qId;
SReqCtx* pCtx = pReq->ctx; SReqCtx* pCtx = pReq->ctx;
SCliThrd* pThrd = pConn->hostThrd;
if (pCtx == NULL) {
tDebug("%s conn %p not need to update statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
return 0;
}
STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (pUserCtx == NULL) { if (pUserCtx == NULL) {
code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx)); code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx));
tDebug("succ to add conn %p of statue ctx, qid:%ld", pConn, qid); tDebug("%s conn %p add conn %p of statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
} else { } else {
transCtxMerge(pUserCtx, &pCtx->userCtx); transCtxMerge(pUserCtx, &pCtx->userCtx);
tDebug("succ to update conn %p of statue ctx, qid:%ld", pConn, qid); tDebug("%s conn %s update conn %p of statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
} }
return 0; return 0;
} }
@ -1576,11 +1583,6 @@ int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) {
if (qid == 0) { if (qid == 0) {
return TSDB_CODE_RPC_NO_STATE; return TSDB_CODE_RPC_NO_STATE;
} }
SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid));
if (pState != 0) {
tDebug("succ to get conn %p of statue, qid:%ld", pConn, qid);
ASSERT(0);
}
SReqState state = {.conn = pConn, .arg = NULL}; SReqState state = {.conn = pConn, .arg = NULL};
code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state)); code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state));
@ -1915,6 +1917,9 @@ _err:
} }
int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = thrd; SCliThrd* pThrd = thrd;
if (pReq->ctx == NULL) {
return 0;
}
return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr);
} }
int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
@ -3069,12 +3074,14 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) {
} }
pCli->type = Normal; pCli->type = Normal;
tDebug("%s release conn id %" PRId64 "", pInst->label, transpointId);
STransMsg msg = {.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = (void*)transpointId}; STransMsg msg = {.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = (void*)transpointId};
TRACE_SET_MSGID(&msg.info.traceId, tGenIdPI64());
msg.info.qId = transpointId; msg.info.qId = transpointId;
pCli->msg = msg; pCli->msg = msg;
STraceId* trace = &pCli->msg.info.traceId;
tGDebug("%s start to free conn qid:%ld", pInst->label, transpointId);
code = transAsyncSend(pThrd->asyncPool, &pCli->q); code = transAsyncSend(pThrd->asyncPool, &pCli->q);
if (code != 0) { if (code != 0) {
taosMemoryFree(pCli); taosMemoryFree(pCli);