From 947e1d2fc4cad7be40fada63a77aadcd32c3f229 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 1 Oct 2024 10:06:52 +0800 Subject: [PATCH] add config --- source/libs/transport/src/transCli.c | 50 +++++++++++++++------------ source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSvr.c | 34 +++++++++--------- 3 files changed, 45 insertions(+), 41 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c0b397b64b..59104d6908 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -509,7 +509,7 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead int64_t qId = taosHton64(pHead->qid); STraceId* trace = &pHead->traceId; int64_t seqNum = taosHton64(pHead->seqNum); - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%" PRId64 ", qid:%" PRId64 "", + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum, qId); @@ -565,7 +565,7 @@ int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, ST } STraceId* trace = &pHead->traceId; pResp->info.ahandle = transCtxDumpVal(pCtx, pHead->msgType); - tGDebug("%s conn %p %s received from %s, local info:%s, qid:%" PRId64 ", create ahandle %p by %s", + tGDebug("%s conn %p %s received from %s, local info:%s, sid:%" PRId64 ", create ahandle %p by %s", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, qId, pResp->info.ahandle, TMSG_INFO(pHead->msgType)); return 0; @@ -624,7 +624,7 @@ void cliHandleResp(SCliConn* conn) { return; } if (code != 0) { - tWarn("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", qId:%" PRId64 + tWarn("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", sid:%" PRId64 ", the sever may sends repeated response since %s", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code)); // TODO: notify cb @@ -637,14 +637,14 @@ void cliHandleResp(SCliConn* conn) { } else { code = cliHandleState_mayUpdateStateTime(conn, pReq); if (code != 0) { - tDebug("%s conn %p failed to update state time qid:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId, + tDebug("%s conn %p failed to update state time sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId, tstrerror(code)); } } code = cliBuildRespFromCont(pReq, &resp, pHead); STraceId* trace = &resp.info.traceId; - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%" PRId64 ", qid:%" PRId64 "", + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId); code = cliNotifyCb(conn, pReq, &resp); @@ -803,6 +803,7 @@ static int32_t getOrCreateConnList(SCliThrd* pThrd, const char* key, SConnList** } QUEUE_INIT(&plist->conns); *ppList = plist; + tDebug("create conn list %p for key %s", plist, key); } else { *ppList = plist; } @@ -1073,7 +1074,7 @@ static void cliDestroyAllQidFromThrd(SCliConn* conn) { tDebug("%s conn %p failed to remove state %" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, *qid, tstrerror(code)); } else { - tDebug("%s conn %p destroy state %" PRId64 "", CONN_GET_INST_LABEL(conn), conn, *qid); + tDebug("%s conn %p destroy sid::%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, *qid); } STransCtx* ctx = pIter; @@ -1093,10 +1094,13 @@ static void cliDestroy(uv_handle_t* handle) { if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { return; } + SCliConn* conn = handle->data; SCliThrd* pThrd = conn->hostThrd; cliResetConnTimer(conn); + tDebug("%s conn %p try to destroy", CONN_GET_INST_LABEL(conn), conn); + code = destroyAllReqs(conn); if (code != 0) { tDebug("%s conn %p failed to all reqs since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); @@ -1107,10 +1111,6 @@ static void cliDestroy(uv_handle_t* handle) { tDebug("%s conn %p failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); } - if (conn->list) { - conn->list->totaSize -= 1; - conn->list = NULL; - } taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); taosMemoryFree(conn->ipStr); @@ -1392,7 +1392,7 @@ int32_t cliBatchSend(SCliConn* pConn) { pCliMsg->seq = pConn->seq; STraceId* trace = &pCliMsg->msg.info.traceId; - tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", qid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), + tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); } @@ -1774,7 +1774,7 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) { SReqCtx* pCtx = pReq->ctx; SCliThrd* pThrd = pConn->hostThrd; if (pCtx == NULL) { - tDebug("%s conn %p not need to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p not need to update statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); return 0; } @@ -1782,11 +1782,11 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) { if (pUserCtx == NULL) { pCtx->userCtx.st = taosGetTimestampUs(); code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx)); - tDebug("%s conn %p succ to add statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p succ to add statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } else { transCtxMerge(pUserCtx, &pCtx->userCtx); pUserCtx->st = taosGetTimestampUs(); - tDebug("%s conn %p succ to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p succ to update statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } return 0; } @@ -1809,12 +1809,12 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { transReleaseExHandle(transGetRefMgt(), qid); return TSDB_CODE_RPC_STATE_DROPED; } - tDebug("%s conn %p failed to get statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p failed to get statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); transReleaseExHandle(transGetRefMgt(), qid); return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } else { *pConn = pState->conn; - tDebug("%s conn %p succ to get conn of statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p succ to get conn of statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } transReleaseExHandle(transGetRefMgt(), qid); return 0; @@ -1832,9 +1832,9 @@ int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq) { SReqState state = {.conn = pConn, .arg = NULL}; code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state)); if (code != 0) { - tDebug("%s conn %p failed to statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p failed to statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } else { - tDebug("%s conn %p succ to add statue, qid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p succ to add statue, sid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid); } TAOS_UNUSED(cliHandleState_mayUpdateStateCtx(pConn, pReq)); @@ -3107,6 +3107,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); } + transReleaseExHandle(transGetRefMgt(), *transpointId); transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return 0; @@ -3438,7 +3439,7 @@ int32_t transAllocHandle(int64_t* refId) { QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); - tDebug("trans alloc qid:%" PRId64 ", malloc:%p", exh->refId, exh); + tDebug("trans alloc sid:%" PRId64 ", malloc:%p", exh->refId, exh); *refId = exh->refId; return 0; } @@ -3470,7 +3471,7 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) { pCli->msg = msg; STraceId* trace = &pCli->msg.info.traceId; - tGDebug("%s start to free conn qid:%" PRId64 "", pInst->label, transpointId); + tGDebug("%s start to free conn sid:%" PRId64 "", pInst->label, transpointId); code = transAsyncSend(pThrd->asyncPool, &pCli->q); if (code != 0) { @@ -3544,7 +3545,7 @@ static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set) if (((*st - pCtx->st) / 1000000) > pInst->readTimeout) { code = taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid)); if (code != 0) { - tError("%s conn %p failed to remove state qid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid, + tError("%s conn %p failed to remove state sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid, tstrerror(code)); } @@ -3553,7 +3554,7 @@ static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set) if (taosArrayPush(pQIdBuf, qid) == NULL) { code = terrno; - tError("%s conn %p failed to add qid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid, + tError("%s conn %p failed to add sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid, tstrerror(code)); break; } @@ -3570,7 +3571,7 @@ static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set) transCtxCleanup(p); code = taosHashRemove(pConn->pQTable, qid, sizeof(*qid)); if (code != 0) { - tError("%s conn %p failed to drop ctx of qid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid, + tError("%s conn %p failed to drop ctx of sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid, tstrerror(code)); } } @@ -3618,6 +3619,9 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) { if (stateNum >= pInst->shareConnLimit || totalReqs >= pInst->shareConnLimit) { if (pConn->list == NULL && pConn->dstAddr != NULL) { pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr)); + if (pConn->list != NULL) { + tDebug("conn %p get list %p from pool for key:%s", pConn, pConn->list, key); + } } if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) { tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index a149a32d96..20adaebf84 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -764,7 +764,7 @@ void transDestroyExHandle(void* handle) { return; } SExHandle* eh = handle; - tDebug("trans destroy qid:%" PRId64 ", memory %p", eh->refId, handle); + tDebug("trans destroy sid:%" PRId64 ", memory %p", eh->refId, handle); taosMemoryFree(handle); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index c13ec1ae4e..d67e908e2a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -394,11 +394,11 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* if (pConn->status == ConnNormal && pHead->noResp == 0) { if (cost >= EXCEPTION_LIMIT_US) { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%" PRId64 - ", qid:%" PRId64 "", + ", sid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%" PRId64 ", qid:%" PRId64 "", + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); } @@ -406,13 +406,13 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* if (cost >= EXCEPTION_LIMIT_US) { tGDebug( "%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, " - "seqNum:%" PRId64 ", qid:%" PRId64 "", + "seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); } else { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%" PRId64 ", " - "qid:%" PRId64 "", + "sid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); } @@ -440,23 +440,23 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { if (pHead->msgType == TDMT_SCH_TASK_RELEASE) { int64_t qId = taosHton64(pHead->qid); if (qId <= 0) { - tError("conn %p recv release, but invalid qid:%" PRId64 "", pConn, qId); + tError("conn %p recv release, but invalid sid:%" PRId64 "", pConn, qId); code = TSDB_CODE_RPC_NO_STATE; } else { void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId)); if (p == NULL) { code = TSDB_CODE_RPC_NO_STATE; - tTrace("conn %p recv release, and releady release by server qid:%" PRId64 "", pConn, qId); + tTrace("conn %p recv release, and releady release by server sid:%" PRId64 "", pConn, qId); } else { SSvrRegArg* arg = p; (pInst->cfp)(pInst->parent, &(arg->msg), NULL); - tTrace("conn %p recv release, notify server app, qid:%" PRId64 "", pConn, qId); + tTrace("conn %p recv release, notify server app, sid:%" PRId64 "", pConn, qId); code = taosHashRemove(pConn->pQTable, &qId, sizeof(qId)); if (code != 0) { - tDebug("conn %p failed to remove qid:%" PRId64 "", pConn, qId); + tDebug("conn %p failed to remove sid:%" PRId64 "", pConn, qId); } - tTrace("conn %p clear state,qid:%" PRId64 "", pConn, qId); + tTrace("conn %p clear state,sid:%" PRId64 "", pConn, qId); } } @@ -680,7 +680,7 @@ void uvOnSendCb(uv_write_t* req, int status) { SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q); STraceId* trace = &smsg->msg.info.traceId; - tGDebug("%s conn %p msg already send out, seqNum:%" PRId64 ", qid:%" PRId64 "", transLabel(conn->pInst), conn, + tGDebug("%s conn %p msg already send out, seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(conn->pInst), conn, smsg->msg.info.seqNum, smsg->msg.info.qId); destroySmsg(smsg); } @@ -691,7 +691,7 @@ void uvOnSendCb(uv_write_t* req, int status) { SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q); STraceId* trace = &smsg->msg.info.traceId; - tGDebug("%s conn %p failed to send, seqNum:%" PRId64 ", qid:%" PRId64 " since %s", transLabel(conn->pInst), conn, + tGDebug("%s conn %p failed to send, seqNum:%" PRId64 ", sid:%" PRId64 " since %s", transLabel(conn->pInst), conn, smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status)); destroySmsg(smsg); } @@ -759,7 +759,7 @@ static int32_t uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { } STraceId* trace = &pMsg->info.traceId; - tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%" PRId64 ", qid:%" PRId64 "", transLabel(pInst), + tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len, pMsg->info.seqNum, pMsg->info.qId); wb->base = (char*)pHead; @@ -865,13 +865,13 @@ int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) { if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) { SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); if (p == NULL) { - tError("%s conn %p already release qid:%" PRId64 "", transLabel(pConn->pInst), pConn, qid); + tError("%s conn %p already release sid:%" PRId64 "", transLabel(pConn->pInst), pConn, qid); return TSDB_CODE_RPC_NO_STATE; } else { transFreeMsg(p->msg.pCont); code = taosHashRemove(pConn->pQTable, &qid, sizeof(qid)); if (code != 0) { - tError("%s conn %p failed to release qid:%" PRId64 " since %s", transLabel(pConn->pInst), pConn, qid, + tError("%s conn %p failed to release sid:%" PRId64 " since %s", transLabel(pConn->pInst), pConn, qid, tstrerror(code)); } } @@ -1393,7 +1393,7 @@ void uvConnDestroyAllState(SSvrConn* p) { SSvrRegArg* arg = pIter; int64_t* qid = taosHashGetKey(pIter, NULL); (pInst->cfp)(pInst->parent, &(arg->msg), NULL); - tTrace("conn %p broken, notify server app, qid:%" PRId64 "", p, *qid); + tTrace("conn %p broken, notify server app, sid:%" PRId64 "", p, *qid); pIter = taosHashIterate(pQTable, pIter); } @@ -1698,7 +1698,7 @@ int32_t uvHandleStateReq(SSvrRespMsg* msg) { int32_t code = 0; SSvrConn* conn = msg->pConn; int64_t qid = msg->msg.info.qId; - tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn, qid); + tDebug("%s conn %p start to register brokenlink callback, sid:%" PRId64 "", transLabel(conn->pInst), conn, qid); SSvrRegArg arg = {.notifyCount = 0, .init = 1, .msg = msg->msg}; SSvrRegArg* p = taosHashGet(conn->pQTable, &qid, sizeof(qid)); @@ -1875,7 +1875,7 @@ int32_t transReleaseSrvHandle(void* handle) { m->msg = tmsg; m->type = Normal; - tDebug("%s conn %p start to send %s, qid:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType), + tDebug("%s conn %p start to send %s, sid:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType), qId); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m);