From 488cccd10e7481353e815dc4cd2702621b778ab1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 8 Sep 2024 06:58:44 +0800 Subject: [PATCH] opt transport --- source/libs/transport/src/transCli.c | 61 +++++---------------- source/libs/transport/src/transSvr.c | 82 ++++++++++++++-------------- 2 files changed, 53 insertions(+), 90 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a11e362cb6..502cc71cf2 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1335,6 +1335,16 @@ static void cliDestroy(uv_handle_t* handle) { taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); + void* pIter = taosHashIterate(conn->pQTable, NULL); + while (pIter) { + int64_t qid = *(int64_t*)pIter; + (void)taosHashRemove(pThrd->pIdConnTable, &qid, sizeof(qid)); + + pIter = taosHashIterate(conn->pQTable, pIter); + + tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, qid); + } + cliDestroyConnMsgs(conn, true); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); @@ -2278,51 +2288,6 @@ void cliConnFreeMsgs(SCliConn* conn) { } } -// bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead) { -// if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { -// for (int i = 0; i < transQueueSize(&conn->reqs); i++) { -// SCliReq* pReq = transQueueGet(&conn->reqs, i); -// if (pHead->ahandle == (uint64_t)pReq->ctx->ahandle) { -// tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, -// conn->refId); transQueueRm(&conn->reqs, i); return true; -// } -// } -// } -// return false; -// } -// bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { -// if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { -// uint64_t ahandle = pHead->ahandle; -// SCliReq* pReq = NULL; -// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); -// tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn, -// conn->refId); - -// (void)transClearBuffer(&conn->readBuf); -// transFreeMsg(transContFromHead((char*)pHead)); - -// for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->reqs); i++) { -// SCliReq* pReq = transQueueGet(&conn->reqs, i); -// if (pReq->type == Release) { -// ASSERTS(pReq == NULL, "trans-cli recv invaid release-req"); -// tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn, -// conn->refId); -// cliDestroyConn(conn, true); -// return true; -// } -// } - -// cliConnFreeMsgs(conn); - -// tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); -// destroyReq(pReq); - -// addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); -// return true; -// } -// return false; -// } - static FORCE_INLINE void destroyReq(void* arg) { SCliReq* pReq = arg; if (pReq == NULL) { @@ -2330,7 +2295,7 @@ static FORCE_INLINE void destroyReq(void* arg) { } tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); - destroyReqCtx(pReq->ctx); + if (pReq->ctx) destroyReqCtx(pReq->ctx); transFreeMsg(pReq->msg.pCont); taosMemoryFree(pReq); } @@ -3562,11 +3527,11 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) { if (pCli == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); } - pCli->type = FreeById; + pCli->type = Normal; tDebug("release conn id %" PRId64 "", transpointId); - STransMsg msg = {.info.handle = (void*)transpointId}; + STransMsg msg = {.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = (void*)transpointId}; msg.info.qId = transpointId; pCli->msg = msg; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index e61b0750f9..1bcd8e8324 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -146,8 +146,6 @@ static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvPrepareCb(uv_prepare_t* handle); -static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead); - /* * time-consuming task throwed into BG work thread */ @@ -429,12 +427,12 @@ static int8_t uvValidConn(SSvrConn* pConn) { return forbiddenIp; } -static int32_t uvHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { +static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { int32_t code = 0; STrans* pInst = pConn->pInst; - if (pHead->msgType == TDMT_SCH_TASK_RELEASE) { - int64_t qId = taosHton64(pHead->qid); - void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId)); + int64_t qId = taosHton64(pHead->qid); + if (pHead->msgType == TDMT_SCH_TASK_RELEASE && qId > 0) { + void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId)); if (p == NULL) { code = TSDB_CODE_RPC_NO_STATE; tTrace("conn %p recv release, and releady release by server qid%ld", pConn, qId); @@ -498,7 +496,7 @@ static bool uvHandleReq(SSvrConn* pConn) { } } - if (uvHandleReleaseReq(pConn, pHead)) { + if (uvMayHandleReleaseReq(pConn, pHead)) { return true; } @@ -770,7 +768,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { (void)uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb); taosMemoryFree(pBuf); } -int32_t uvConnMayHandlsReleaseMsg(SSvrMsg* pMsg) { +int32_t uvMayHandleReleaseResp(SSvrMsg* pMsg) { SSvrConn* pConn = pMsg->pConn; int64_t qid = pMsg->msg.info.qId; if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) { @@ -788,7 +786,7 @@ int32_t uvConnMayHandlsReleaseMsg(SSvrMsg* pMsg) { static void uvStartSendResp(SSvrMsg* smsg) { // impl SSvrConn* pConn = smsg->pConn; - if (uvConnMayHandlsReleaseMsg(smsg) == TSDB_CODE_RPC_NO_STATE) { + if (uvMayHandleReleaseResp(smsg) == TSDB_CODE_RPC_NO_STATE) { destroySmsg(smsg); return; } @@ -888,40 +886,40 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { uv_close((uv_handle_t*)req->handle, uvDestroyConn); taosMemoryFree(req); } -static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { - if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { - tTrace("conn %p received release request", pConn); - STraceId traceId = pHead->traceId; - (void)transClearBuffer(&pConn->readBuf); - transFreeMsg(transContFromHead((char*)pHead)); - if (pConn->status != ConnAcquire) { - return true; - } - pConn->status = ConnRelease; +// static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { +// if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { +// tTrace("conn %p received release request", pConn); +// STraceId traceId = pHead->traceId; +// (void)transClearBuffer(&pConn->readBuf); +// transFreeMsg(transContFromHead((char*)pHead)); +// if (pConn->status != ConnAcquire) { +// return true; +// } +// pConn->status = ConnRelease; - STransMsg tmsg = {.code = 0, - .info.handle = (void*)pConn, - .info.traceId = traceId, - .info.ahandle = (void*)0x9527, - .info.seqNum = htonl(pHead->seqNum)}; - SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); - srvMsg->msg = tmsg; - srvMsg->type = Release; - srvMsg->pConn = pConn; - if (!transQueuePush(&pConn->srvMsgs, srvMsg)) { - return true; - } - if (pConn->regArg.init) { - tTrace("conn %p release, notify server app", pConn); - STrans* pInst = pConn->pInst; - (*pInst->cfp)(pInst->parent, &(pConn->regArg.msg), NULL); - memset(&pConn->regArg, 0, sizeof(pConn->regArg)); - } - uvStartSendRespImpl(srvMsg); - return true; - } - return false; -} +// STransMsg tmsg = {.code = 0, +// .info.handle = (void*)pConn, +// .info.traceId = traceId, +// .info.ahandle = (void*)0x9527, +// .info.seqNum = htonl(pHead->seqNum)}; +// SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); +// srvMsg->msg = tmsg; +// srvMsg->type = Release; +// srvMsg->pConn = pConn; +// if (!transQueuePush(&pConn->srvMsgs, srvMsg)) { +// return true; +// } +// if (pConn->regArg.init) { +// tTrace("conn %p release, notify server app", pConn); +// STrans* pInst = pConn->pInst; +// (*pInst->cfp)(pInst->parent, &(pConn->regArg.msg), NULL); +// memset(&pConn->regArg, 0, sizeof(pConn->regArg)); +// } +// uvStartSendRespImpl(srvMsg); +// return true; +// } +// return false; +// } static void uvWorkDoTask(uv_work_t* req) { // doing time-consumeing task // only auth conn currently, add more func later