From 2f487130ce3b8b53b189a21db217eb5d08eb9d74 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 Sep 2024 17:39:39 +0800 Subject: [PATCH] opt parameter --- source/libs/transport/src/transCli.c | 20 ++++----- source/libs/transport/src/transSvr.c | 63 +++++++++------------------- 2 files changed, 27 insertions(+), 56 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3718331358..4e7515b12c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -465,7 +465,9 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) { if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) { int64_t qId = taosHton64(pHead->qid); STraceId* trace = &pHead->traceId; - tGDebug("%s conn %p receive release req, qid:%ld", CONN_GET_INST_LABEL(conn), conn, qId); + int32_t seqNum = htonl(pHead->seqNum); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%d, qid:%ld", CONN_GET_INST_LABEL(conn), + conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum, qId); STransCtx* p = taosHashGet(conn->pQTable, &qId, sizeof(qId)); transCtxCleanup(p); @@ -569,8 +571,8 @@ void cliHandleResp2(SCliConn* conn) { qId, tstrerror(code)); } if (code != 0) { - tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq, - tstrerror(code)); + tDebug("%s conn %p recv unexpected packet, seqNum:%d, qId:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq, + qId, tstrerror(code)); // TODO: notify cb if (cliMayRecycleConn(conn)) { return; @@ -3433,7 +3435,7 @@ int32_t transAllocHandle(int64_t* refId) { QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); - tDebug("alloc qid:%ld", exh->refId); + tDebug("trans alloc qid:%ld", exh->refId); *refId = exh->refId; return 0; } @@ -3459,7 +3461,7 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) { } pCli->type = Normal; - tDebug("release conn id %" PRId64 "", transpointId); + tDebug("%s release conn id %" PRId64 "", pInst->label, transpointId); STransMsg msg = {.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = (void*)transpointId}; msg.info.qId = transpointId; @@ -3513,14 +3515,6 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { return NULL; } code = transHeapGet(pHeap, &pConn); - // if (pConn && taosHashGetSifze(pConn->pQTable) > 0) { - // tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, - // pConn->reqRefCnt); return NULL; - // } /*else { - // // tDebug("failed to get conn from heap cache for key:%s", key); - // // return NULL; - // }*/ - if (code != 0) { tDebug("failed to get conn from heap cache for key:%s", key); return NULL; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 96e369af9a..30b9053979 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -167,9 +167,6 @@ static void uvNotifyLinkBrokenToApp(SSvrConn* conn); static FORCE_INLINE void destroySmsg(SSvrRespMsg* smsg); static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); -// static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); - -static int32_t reallocConnRef(SSvrConn* conn); int32_t uvGetConnRefOfThrd(SWorkThrd* thrd) { return thrd ? thrd->connRefMgt : -1; } @@ -1309,43 +1306,26 @@ static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) { } } } -// static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) { -// if (conn->regArg.init == 1) { -// transFreeMsg(conn->regArg.msg.pCont); -// conn->regArg.init = 0; -// } -// } -static int32_t reallocConnRef(SSvrConn* conn) { - if (conn->refId > 0) { - (void)transReleaseExHandle(uvGetConnRefOfThrd(conn->hostThrd), conn->refId); - (void)transRemoveExHandle(uvGetConnRefOfThrd(conn->hostThrd), conn->refId); - } - // avoid app continue to send msg on invalid handle - SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); - if (exh == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + +void uvConnDestroyAllState(SSvrConn* p) { + STrans* pInst = p->pInst; + SHashObj* pQTable = p->pQTable; + if (pQTable == NULL) return; + + void* pIter = taosHashIterate(pQTable, NULL); + while (pIter) { + SSvrRegArg* arg = pIter; + int64_t* qid = taosHashGetKey(pIter, NULL); + (pInst->cfp)(pInst->parent, &(arg->msg), NULL); + tTrace("conn %p broken, notify server app, qid%ld", p, *qid); + pIter = taosHashIterate(pQTable, pIter); } - exh->handle = conn; - exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(uvGetConnRefOfThrd(conn->hostThrd), exh); - if (exh->refId < 0) { - taosMemoryFree(exh); - return TSDB_CODE_REF_INVALID_ID; - } - - QUEUE_INIT(&exh->q); - SExHandle* pSelf = transAcquireExHandle(uvGetConnRefOfThrd(conn->hostThrd), exh->refId); - if (pSelf != exh) { - tError("conn %p failed to acquire handle", conn); - taosMemoryFree(exh); - return TSDB_CODE_REF_INVALID_ID; - } - - conn->refId = exh->refId; - - return 0; + taosHashCleanup(pQTable); + pQTable = NULL; + return; } + static void uvDestroyConn(uv_handle_t* handle) { SSvrConn* conn = handle->data; @@ -1360,18 +1340,15 @@ static void uvDestroyConn(uv_handle_t* handle) { STrans* pInst = thrd->pInst; tDebug("%s conn %p destroy", transLabel(pInst), conn); - // for (int i = 0; i < transQueueSize(&conn->resps); i++) { - // SSvrRespMsg* msg = transQueueGet(&conn->resps, i); - // destroySmsg(msg); - // } transQueueDestroy(&conn->resps); transReqQueueClear(&conn->wreqQueue); QUEUE_REMOVE(&conn->queue); - taosHashCleanup(conn->pQTable); taosMemoryFree(conn->pTcp); - // destroyConnRegArg(conn); + + uvConnDestroyAllState(conn); + (void)transDestroyBuffer(&conn->readBuf); taosMemoryFree(conn);