add config

This commit is contained in:
yihaoDeng 2024-10-01 10:06:52 +08:00
parent 60718d0e38
commit 947e1d2fc4
3 changed files with 45 additions and 41 deletions

View File

@ -509,7 +509,7 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
int64_t qId = taosHton64(pHead->qid); int64_t qId = taosHton64(pHead->qid);
STraceId* trace = &pHead->traceId; STraceId* trace = &pHead->traceId;
int64_t seqNum = taosHton64(pHead->seqNum); int64_t seqNum = taosHton64(pHead->seqNum);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%" PRId64 ", qid:%" PRId64 "", tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%" PRId64 ", sid:%" PRId64 "",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum, CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum,
qId); qId);
@ -565,7 +565,7 @@ int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, ST
} }
STraceId* trace = &pHead->traceId; STraceId* trace = &pHead->traceId;
pResp->info.ahandle = transCtxDumpVal(pCtx, pHead->msgType); pResp->info.ahandle = transCtxDumpVal(pCtx, pHead->msgType);
tGDebug("%s conn %p %s received from %s, local info:%s, qid:%" PRId64 ", create ahandle %p by %s", tGDebug("%s conn %p %s received from %s, local info:%s, sid:%" PRId64 ", create ahandle %p by %s",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, qId, pResp->info.ahandle, CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, qId, pResp->info.ahandle,
TMSG_INFO(pHead->msgType)); TMSG_INFO(pHead->msgType));
return 0; return 0;
@ -624,7 +624,7 @@ void cliHandleResp(SCliConn* conn) {
return; return;
} }
if (code != 0) { if (code != 0) {
tWarn("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", qId:%" PRId64 tWarn("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", sid:%" PRId64
", the sever may sends repeated response since %s", ", the sever may sends repeated response since %s",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code)); CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code));
// TODO: notify cb // TODO: notify cb
@ -637,14 +637,14 @@ void cliHandleResp(SCliConn* conn) {
} else { } else {
code = cliHandleState_mayUpdateStateTime(conn, pReq); code = cliHandleState_mayUpdateStateTime(conn, pReq);
if (code != 0) { if (code != 0) {
tDebug("%s conn %p failed to update state time qid:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId, tDebug("%s conn %p failed to update state time sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tstrerror(code)); tstrerror(code));
} }
} }
code = cliBuildRespFromCont(pReq, &resp, pHead); code = cliBuildRespFromCont(pReq, &resp, pHead);
STraceId* trace = &resp.info.traceId; STraceId* trace = &resp.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%" PRId64 ", qid:%" PRId64 "", tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%" PRId64 ", sid:%" PRId64 "",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId); CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId);
code = cliNotifyCb(conn, pReq, &resp); code = cliNotifyCb(conn, pReq, &resp);
@ -803,6 +803,7 @@ static int32_t getOrCreateConnList(SCliThrd* pThrd, const char* key, SConnList**
} }
QUEUE_INIT(&plist->conns); QUEUE_INIT(&plist->conns);
*ppList = plist; *ppList = plist;
tDebug("create conn list %p for key %s", plist, key);
} else { } else {
*ppList = plist; *ppList = plist;
} }
@ -1073,7 +1074,7 @@ static void cliDestroyAllQidFromThrd(SCliConn* conn) {
tDebug("%s conn %p failed to remove state %" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, *qid, tDebug("%s conn %p failed to remove state %" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, *qid,
tstrerror(code)); tstrerror(code));
} else { } else {
tDebug("%s conn %p destroy state %" PRId64 "", CONN_GET_INST_LABEL(conn), conn, *qid); tDebug("%s conn %p destroy sid::%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, *qid);
} }
STransCtx* ctx = pIter; STransCtx* ctx = pIter;
@ -1093,10 +1094,13 @@ static void cliDestroy(uv_handle_t* handle) {
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
return; return;
} }
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
cliResetConnTimer(conn); cliResetConnTimer(conn);
tDebug("%s conn %p try to destroy", CONN_GET_INST_LABEL(conn), conn);
code = destroyAllReqs(conn); code = destroyAllReqs(conn);
if (code != 0) { if (code != 0) {
tDebug("%s conn %p failed to all reqs since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); tDebug("%s conn %p failed to all reqs since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
@ -1107,10 +1111,6 @@ static void cliDestroy(uv_handle_t* handle) {
tDebug("%s conn %p failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); tDebug("%s conn %p failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
} }
if (conn->list) {
conn->list->totaSize -= 1;
conn->list = NULL;
}
taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->stream); taosMemoryFree(conn->stream);
taosMemoryFree(conn->ipStr); taosMemoryFree(conn->ipStr);
@ -1392,7 +1392,7 @@ int32_t cliBatchSend(SCliConn* pConn) {
pCliMsg->seq = pConn->seq; pCliMsg->seq = pConn->seq;
STraceId* trace = &pCliMsg->msg.info.traceId; STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", qid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
} }
@ -1774,7 +1774,7 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) {
SReqCtx* pCtx = pReq->ctx; SReqCtx* pCtx = pReq->ctx;
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
if (pCtx == NULL) { if (pCtx == NULL) {
tDebug("%s conn %p not need to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); tDebug("%s conn %p not need to update statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
return 0; return 0;
} }
@ -1782,11 +1782,11 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) {
if (pUserCtx == NULL) { if (pUserCtx == NULL) {
pCtx->userCtx.st = taosGetTimestampUs(); pCtx->userCtx.st = taosGetTimestampUs();
code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx)); code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx));
tDebug("%s conn %p succ to add statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); tDebug("%s conn %p succ to add statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
} else { } else {
transCtxMerge(pUserCtx, &pCtx->userCtx); transCtxMerge(pUserCtx, &pCtx->userCtx);
pUserCtx->st = taosGetTimestampUs(); pUserCtx->st = taosGetTimestampUs();
tDebug("%s conn %p succ to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); tDebug("%s conn %p succ to update statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
} }
return 0; return 0;
} }
@ -1809,12 +1809,12 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
transReleaseExHandle(transGetRefMgt(), qid); transReleaseExHandle(transGetRefMgt(), qid);
return TSDB_CODE_RPC_STATE_DROPED; return TSDB_CODE_RPC_STATE_DROPED;
} }
tDebug("%s conn %p failed to get statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); tDebug("%s conn %p failed to get statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
transReleaseExHandle(transGetRefMgt(), qid); transReleaseExHandle(transGetRefMgt(), qid);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS; return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} else { } else {
*pConn = pState->conn; *pConn = pState->conn;
tDebug("%s conn %p succ to get conn of statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); tDebug("%s conn %p succ to get conn of statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
} }
transReleaseExHandle(transGetRefMgt(), qid); transReleaseExHandle(transGetRefMgt(), qid);
return 0; return 0;
@ -1832,9 +1832,9 @@ int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq) {
SReqState state = {.conn = pConn, .arg = NULL}; SReqState state = {.conn = pConn, .arg = NULL};
code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state)); code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state));
if (code != 0) { if (code != 0) {
tDebug("%s conn %p failed to statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); tDebug("%s conn %p failed to statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
} else { } else {
tDebug("%s conn %p succ to add statue, qid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid); tDebug("%s conn %p succ to add statue, sid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid);
} }
TAOS_UNUSED(cliHandleState_mayUpdateStateCtx(pConn, pReq)); TAOS_UNUSED(cliHandleState_mayUpdateStateCtx(pConn, pReq));
@ -3107,6 +3107,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
} }
transReleaseExHandle(transGetRefMgt(), *transpointId);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return 0; return 0;
@ -3438,7 +3439,7 @@ int32_t transAllocHandle(int64_t* refId) {
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch); taosInitRWLatch(&exh->latch);
tDebug("trans alloc qid:%" PRId64 ", malloc:%p", exh->refId, exh); tDebug("trans alloc sid:%" PRId64 ", malloc:%p", exh->refId, exh);
*refId = exh->refId; *refId = exh->refId;
return 0; return 0;
} }
@ -3470,7 +3471,7 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) {
pCli->msg = msg; pCli->msg = msg;
STraceId* trace = &pCli->msg.info.traceId; STraceId* trace = &pCli->msg.info.traceId;
tGDebug("%s start to free conn qid:%" PRId64 "", pInst->label, transpointId); tGDebug("%s start to free conn sid:%" PRId64 "", pInst->label, transpointId);
code = transAsyncSend(pThrd->asyncPool, &pCli->q); code = transAsyncSend(pThrd->asyncPool, &pCli->q);
if (code != 0) { if (code != 0) {
@ -3544,7 +3545,7 @@ static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set)
if (((*st - pCtx->st) / 1000000) > pInst->readTimeout) { if (((*st - pCtx->st) / 1000000) > pInst->readTimeout) {
code = taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid)); code = taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid));
if (code != 0) { if (code != 0) {
tError("%s conn %p failed to remove state qid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid, tError("%s conn %p failed to remove state sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid,
tstrerror(code)); tstrerror(code));
} }
@ -3553,7 +3554,7 @@ static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set)
if (taosArrayPush(pQIdBuf, qid) == NULL) { if (taosArrayPush(pQIdBuf, qid) == NULL) {
code = terrno; code = terrno;
tError("%s conn %p failed to add qid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid, tError("%s conn %p failed to add sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid,
tstrerror(code)); tstrerror(code));
break; break;
} }
@ -3570,7 +3571,7 @@ static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set)
transCtxCleanup(p); transCtxCleanup(p);
code = taosHashRemove(pConn->pQTable, qid, sizeof(*qid)); code = taosHashRemove(pConn->pQTable, qid, sizeof(*qid));
if (code != 0) { if (code != 0) {
tError("%s conn %p failed to drop ctx of qid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid, tError("%s conn %p failed to drop ctx of sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid,
tstrerror(code)); tstrerror(code));
} }
} }
@ -3618,6 +3619,9 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) {
if (stateNum >= pInst->shareConnLimit || totalReqs >= pInst->shareConnLimit) { if (stateNum >= pInst->shareConnLimit || totalReqs >= pInst->shareConnLimit) {
if (pConn->list == NULL && pConn->dstAddr != NULL) { if (pConn->list == NULL && pConn->dstAddr != NULL) {
pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr)); pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr));
if (pConn->list != NULL) {
tDebug("conn %p get list %p from pool for key:%s", pConn, pConn->list, key);
}
} }
if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) { if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) {
tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn); tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn);

View File

@ -764,7 +764,7 @@ void transDestroyExHandle(void* handle) {
return; return;
} }
SExHandle* eh = handle; SExHandle* eh = handle;
tDebug("trans destroy qid:%" PRId64 ", memory %p", eh->refId, handle); tDebug("trans destroy sid:%" PRId64 ", memory %p", eh->refId, handle);
taosMemoryFree(handle); taosMemoryFree(handle);
} }

View File

@ -394,11 +394,11 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg*
if (pConn->status == ConnNormal && pHead->noResp == 0) { if (pConn->status == ConnNormal && pHead->noResp == 0) {
if (cost >= EXCEPTION_LIMIT_US) { if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%" PRId64 tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%" PRId64
", qid:%" PRId64 "", ", sid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
} else { } else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%" PRId64 ", qid:%" PRId64 "", tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%" PRId64 ", sid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
} }
@ -406,13 +406,13 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg*
if (cost >= EXCEPTION_LIMIT_US) { if (cost >= EXCEPTION_LIMIT_US) {
tGDebug( tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, " "%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, "
"seqNum:%" PRId64 ", qid:%" PRId64 "", "seqNum:%" PRId64 ", sid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
} else { } else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%" PRId64 tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%" PRId64
", " ", "
"qid:%" PRId64 "", "sid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
} }
@ -440,23 +440,23 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
if (pHead->msgType == TDMT_SCH_TASK_RELEASE) { if (pHead->msgType == TDMT_SCH_TASK_RELEASE) {
int64_t qId = taosHton64(pHead->qid); int64_t qId = taosHton64(pHead->qid);
if (qId <= 0) { if (qId <= 0) {
tError("conn %p recv release, but invalid qid:%" PRId64 "", pConn, qId); tError("conn %p recv release, but invalid sid:%" PRId64 "", pConn, qId);
code = TSDB_CODE_RPC_NO_STATE; code = TSDB_CODE_RPC_NO_STATE;
} else { } else {
void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId)); void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId));
if (p == NULL) { if (p == NULL) {
code = TSDB_CODE_RPC_NO_STATE; code = TSDB_CODE_RPC_NO_STATE;
tTrace("conn %p recv release, and releady release by server qid:%" PRId64 "", pConn, qId); tTrace("conn %p recv release, and releady release by server sid:%" PRId64 "", pConn, qId);
} else { } else {
SSvrRegArg* arg = p; SSvrRegArg* arg = p;
(pInst->cfp)(pInst->parent, &(arg->msg), NULL); (pInst->cfp)(pInst->parent, &(arg->msg), NULL);
tTrace("conn %p recv release, notify server app, qid:%" PRId64 "", pConn, qId); tTrace("conn %p recv release, notify server app, sid:%" PRId64 "", pConn, qId);
code = taosHashRemove(pConn->pQTable, &qId, sizeof(qId)); code = taosHashRemove(pConn->pQTable, &qId, sizeof(qId));
if (code != 0) { if (code != 0) {
tDebug("conn %p failed to remove qid:%" PRId64 "", pConn, qId); tDebug("conn %p failed to remove sid:%" PRId64 "", pConn, qId);
} }
tTrace("conn %p clear state,qid:%" PRId64 "", pConn, qId); tTrace("conn %p clear state,sid:%" PRId64 "", pConn, qId);
} }
} }
@ -680,7 +680,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q); SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
STraceId* trace = &smsg->msg.info.traceId; STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p msg already send out, seqNum:%" PRId64 ", qid:%" PRId64 "", transLabel(conn->pInst), conn, tGDebug("%s conn %p msg already send out, seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId); smsg->msg.info.seqNum, smsg->msg.info.qId);
destroySmsg(smsg); destroySmsg(smsg);
} }
@ -691,7 +691,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q); SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
STraceId* trace = &smsg->msg.info.traceId; STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p failed to send, seqNum:%" PRId64 ", qid:%" PRId64 " since %s", transLabel(conn->pInst), conn, tGDebug("%s conn %p failed to send, seqNum:%" PRId64 ", sid:%" PRId64 " since %s", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status)); smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status));
destroySmsg(smsg); destroySmsg(smsg);
} }
@ -759,7 +759,7 @@ static int32_t uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
} }
STraceId* trace = &pMsg->info.traceId; STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%" PRId64 ", qid:%" PRId64 "", transLabel(pInst), tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(pInst),
pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len, pMsg->info.seqNum, pMsg->info.qId); pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len, pMsg->info.seqNum, pMsg->info.qId);
wb->base = (char*)pHead; wb->base = (char*)pHead;
@ -865,13 +865,13 @@ int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) {
if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) { if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) {
SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (p == NULL) { if (p == NULL) {
tError("%s conn %p already release qid:%" PRId64 "", transLabel(pConn->pInst), pConn, qid); tError("%s conn %p already release sid:%" PRId64 "", transLabel(pConn->pInst), pConn, qid);
return TSDB_CODE_RPC_NO_STATE; return TSDB_CODE_RPC_NO_STATE;
} else { } else {
transFreeMsg(p->msg.pCont); transFreeMsg(p->msg.pCont);
code = taosHashRemove(pConn->pQTable, &qid, sizeof(qid)); code = taosHashRemove(pConn->pQTable, &qid, sizeof(qid));
if (code != 0) { if (code != 0) {
tError("%s conn %p failed to release qid:%" PRId64 " since %s", transLabel(pConn->pInst), pConn, qid, tError("%s conn %p failed to release sid:%" PRId64 " since %s", transLabel(pConn->pInst), pConn, qid,
tstrerror(code)); tstrerror(code));
} }
} }
@ -1393,7 +1393,7 @@ void uvConnDestroyAllState(SSvrConn* p) {
SSvrRegArg* arg = pIter; SSvrRegArg* arg = pIter;
int64_t* qid = taosHashGetKey(pIter, NULL); int64_t* qid = taosHashGetKey(pIter, NULL);
(pInst->cfp)(pInst->parent, &(arg->msg), NULL); (pInst->cfp)(pInst->parent, &(arg->msg), NULL);
tTrace("conn %p broken, notify server app, qid:%" PRId64 "", p, *qid); tTrace("conn %p broken, notify server app, sid:%" PRId64 "", p, *qid);
pIter = taosHashIterate(pQTable, pIter); pIter = taosHashIterate(pQTable, pIter);
} }
@ -1698,7 +1698,7 @@ int32_t uvHandleStateReq(SSvrRespMsg* msg) {
int32_t code = 0; int32_t code = 0;
SSvrConn* conn = msg->pConn; SSvrConn* conn = msg->pConn;
int64_t qid = msg->msg.info.qId; int64_t qid = msg->msg.info.qId;
tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn, qid); tDebug("%s conn %p start to register brokenlink callback, sid:%" PRId64 "", transLabel(conn->pInst), conn, qid);
SSvrRegArg arg = {.notifyCount = 0, .init = 1, .msg = msg->msg}; SSvrRegArg arg = {.notifyCount = 0, .init = 1, .msg = msg->msg};
SSvrRegArg* p = taosHashGet(conn->pQTable, &qid, sizeof(qid)); SSvrRegArg* p = taosHashGet(conn->pQTable, &qid, sizeof(qid));
@ -1875,7 +1875,7 @@ int32_t transReleaseSrvHandle(void* handle) {
m->msg = tmsg; m->msg = tmsg;
m->type = Normal; m->type = Normal;
tDebug("%s conn %p start to send %s, qid:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType), tDebug("%s conn %p start to send %s, sid:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType),
qId); qId);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m); destroySmsg(m);