From 5ae6b83d0e10a9bae709284a2e101788e9d836b3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 7 Sep 2024 20:08:48 +0800 Subject: [PATCH] opt transport --- source/libs/transport/src/transCli.c | 110 ++++++++++++++++----------- source/libs/transport/src/transSvr.c | 42 ++++++---- 2 files changed, 91 insertions(+), 61 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 90b47e1ef8..a11e362cb6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -209,7 +209,7 @@ static void cliSendBatchCb(uv_write_t* req, int status); SCliBatch* cliGetHeadFromList(SCliBatchList* pList); -static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); +// static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static int32_t allocConnRef(SCliConn* conn, bool update); @@ -508,12 +508,15 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe pResp->info.hasEpSet = pHead->hasEpSet; pResp->info.cliVer = htonl(pHead->compatibilityVer); pResp->info.seqNum = htonl(pHead->seqNum); + + int64_t qid = taosHton64(pHead->qid); + pResp->info.handle = (void*)qid; return 0; } int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; - if (pHead->msgType == TDMT_SCH_TASK_RELEASE) { + if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) { int64_t qId = taosHton64(pHead->qid); code = taosHashRemove(conn->pQTable, &qId, sizeof(qId)); if (code != 0) { @@ -524,7 +527,8 @@ int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) { if (code != 0) { tDebug("%s conn %p failed to release req %ld from thrd ", CONN_GET_INST_LABEL(conn), conn, qId); } - tDebug("%s conn %p release req %ld", CONN_GET_INST_LABEL(conn), conn, qId); + STraceId* trace = &pHead->traceId; + tGDebug("%s conn %p receive release req, qid:%ld", CONN_GET_INST_LABEL(conn), conn, qId); for (int32_t i = 0; i < transQueueSize(&conn->reqs); i++) { SCliReq* pReqs = transQueueGet(&conn->reqs, i); @@ -1957,9 +1961,11 @@ int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid)); if (pState == NULL) { + tDebug("failed to get statue, qid:%ld", qid); return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } else { *pConn = pState->conn; + tDebug("succ to get conn of statue, qid:%ld", qid); } return code; } @@ -1972,10 +1978,24 @@ int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) { } SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid)); if (pState != 0) { + tDebug("succ to get conn %p of statue, qid:%ld", pConn, qid); ASSERT(0); } SReqState state = {.conn = pConn, .arg = NULL}; code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state)); + if (code != 0) { + tDebug("failed to add conn %p of statue, qid:%ld", pConn, qid); + } else { + tDebug("succ to add conn %p of statue, qid:%ld (1)", pConn, qid); + } + + int32_t dummy = 0; + code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &dummy, sizeof(dummy)); + if (code != 0) { + tDebug("failed to add conn %p of statue, qid:%ld", pConn, qid); + } else { + tDebug("succ to add conn %p of statue, qid:%ld(2)", pConn, qid); + } return code; } void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { @@ -2007,8 +2027,8 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { addConnToHeapCache(pThrd->connHeapCache, pConn); } } + code = cliMayUpdateState(pThrd, pReq, pConn); } - code = cliMayUpdateState(pThrd, pReq, pConn); code = cliSendReq(pConn, pReq); tTrace("%s conn %p ready", pInst->label, pConn); @@ -2258,51 +2278,50 @@ void cliConnFreeMsgs(SCliConn* conn) { } } -bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead) { - if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { - for (int i = 0; i < transQueueSize(&conn->reqs); i++) { - SCliReq* pReq = transQueueGet(&conn->reqs, i); - if (pHead->ahandle == (uint64_t)pReq->ctx->ahandle) { - tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); - transQueueRm(&conn->reqs, i); - return true; - } - } - } - return false; -} -bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { - if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { - uint64_t ahandle = pHead->ahandle; - SCliReq* pReq = NULL; - CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); - tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn, - conn->refId); +// bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead) { +// if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { +// for (int i = 0; i < transQueueSize(&conn->reqs); i++) { +// SCliReq* pReq = transQueueGet(&conn->reqs, i); +// if (pHead->ahandle == (uint64_t)pReq->ctx->ahandle) { +// tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, +// conn->refId); transQueueRm(&conn->reqs, i); return true; +// } +// } +// } +// return false; +// } +// bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { +// if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { +// uint64_t ahandle = pHead->ahandle; +// SCliReq* pReq = NULL; +// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); +// tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn, +// conn->refId); - (void)transClearBuffer(&conn->readBuf); - transFreeMsg(transContFromHead((char*)pHead)); +// (void)transClearBuffer(&conn->readBuf); +// transFreeMsg(transContFromHead((char*)pHead)); - for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->reqs); i++) { - SCliReq* pReq = transQueueGet(&conn->reqs, i); - if (pReq->type == Release) { - ASSERTS(pReq == NULL, "trans-cli recv invaid release-req"); - tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn, - conn->refId); - cliDestroyConn(conn, true); - return true; - } - } +// for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->reqs); i++) { +// SCliReq* pReq = transQueueGet(&conn->reqs, i); +// if (pReq->type == Release) { +// ASSERTS(pReq == NULL, "trans-cli recv invaid release-req"); +// tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn, +// conn->refId); +// cliDestroyConn(conn, true); +// return true; +// } +// } - cliConnFreeMsgs(conn); +// cliConnFreeMsgs(conn); - tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); - destroyReq(pReq); +// tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); +// destroyReq(pReq); - addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); - return true; - } - return false; -} +// addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); +// return true; +// } +// return false; +// } static FORCE_INLINE void destroyReq(void* arg) { SCliReq* pReq = arg; @@ -3086,6 +3105,7 @@ int32_t transReleaseCliHandle(void* handle) { .info.handle = handle, .info.ahandle = (void*)0x9527, .info.qId = (int64_t)handle}; + TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); @@ -3518,7 +3538,7 @@ int32_t transAllocHandle(int64_t* refId) { QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); - tDebug("pre alloc refId %" PRId64 "", exh->refId); + tDebug("alloc qid:%ld", exh->refId); *refId = exh->refId; return 0; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index ad080f655a..e61b0750f9 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -390,21 +390,26 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* if (pConn->status == ConnNormal && pHead->noResp == 0) { // transRefSrvHandle(pConn); if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pInst), - pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%d, qid:%ld", + transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pInst), pConn, - TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%d, qid:%ld", transLabel(pInst), + pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost, + pTransMsg->info.seqNum, pTransMsg->info.qId); } } else { if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", - transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, - pHead->noResp, pTransMsg->code, (int)(cost)); + tGDebug( + "%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, " + "seqNum:%d, qid:%ld", + transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", transLabel(pInst), - pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, pHead->noResp, - pTransMsg->code, (int)(cost)); + tGDebug( + "%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%d, qid:%ld", + transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); } } tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pInst), pTransMsg->info.handle, pConn, @@ -502,7 +507,6 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.pCont = pHead->content; transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - transMsg.info.qId = taosHton64(pHead->qid); if (transMsg.info.qId > 0) { // int32_t code = taosHashPut(pConn->pQTable, &transMsg.info.qId, sizeof(int64_t), &transMsg, sizeof(STransMsg)); @@ -531,6 +535,7 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.info.forbiddenIp = forbiddenIp; transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0; transMsg.info.seqNum = htonl(pHead->seqNum); + transMsg.info.qId = taosHton64(pHead->qid); // uvMaySetConnAcquired(pConn, pHead); @@ -767,8 +772,8 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { } int32_t uvConnMayHandlsReleaseMsg(SSvrMsg* pMsg) { SSvrConn* pConn = pMsg->pConn; - if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE) { - int64_t qid = pMsg->msg.info.qId; + int64_t qid = pMsg->msg.info.qId; + if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) { SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); if (p == NULL) { tError("%s conn %p already release qid %ld", transLabel(pConn->pInst), pConn, qid); @@ -1736,8 +1741,13 @@ int32_t transReleaseSrvHandle(void* handle) { SWorkThrd* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); - STransMsg tmsg = { - .msgType = TDMT_SCH_TASK_RELEASE, .code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId}; + STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE, + .code = 0, + .info.handle = exh, + .info.ahandle = NULL, + .info.refId = refId, + .info.qId = qId, + .info.traceId = info->traceId}; SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); if (m == NULL) { @@ -1748,7 +1758,7 @@ int32_t transReleaseSrvHandle(void* handle) { m->msg = tmsg; m->type = Normal; - tDebug("%s conn %p start to %p, qId:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType), + tDebug("%s conn %p start to send %s, qid:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType), qId); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m);