From e9d84f81318ba529260fb720e84053dc36ca8cfb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 16 Sep 2024 08:22:49 +0800 Subject: [PATCH] Merge branch '3.0' into enh/opt-transport --- source/libs/scheduler/src/schRemote.c | 4 +++ source/libs/transport/src/transCli.c | 37 ++++++++++++--------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 40391cea7e..bb06a25ce0 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -501,6 +501,10 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { code); // called if drop task rsp received code (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error + + if (pMsg->handle == NULL) { + ASSERT(0); + } if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b740968007..c9a23d76be 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -216,7 +216,7 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList); static void destroyCliConnQTable(SCliConn* conn); -static void cliHandleBatch_shareConnExcept(SCliConn* conn); +static void cliHandleException(SCliConn* conn); static int32_t allocConnRef(SCliConn* conn, bool update); static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp); @@ -976,7 +976,7 @@ _failed: taosMemoryFree(conn); return code; } -static void cliDestroyConn(SCliConn* conn, bool clear) { cliHandleBatch_shareConnExcept(conn); } +static void cliDestroyConn(SCliConn* conn, bool clear) { cliHandleException(conn); } static void cliDestroy(uv_handle_t* handle) { if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { return; @@ -1021,7 +1021,7 @@ static void cliDestroy(uv_handle_t* handle) { bool filterAllReq(void* e, void* arg) { return 1; } -static void cliHandleBatch_shareConnExcept(SCliConn* conn) { +static void cliHandleException(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; @@ -1046,6 +1046,7 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; resp.info.cliVer = pInst->compatibilityVer; resp.info.ahandle = pCtx ? pCtx->ahandle : 0; + resp.info.handle = pReq->msg.info.handle; if (pReq) { resp.info.traceId = pReq->msg.info.traceId; } @@ -1065,9 +1066,11 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { destroyReq(pReq); } } - int8_t ref = transGetRefCount(conn); - if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { - uv_close((uv_handle_t*)conn->stream, cliDestroy); + if (conn->registered) { + int8_t ref = transGetRefCount(conn); + if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { + uv_close((uv_handle_t*)conn->stream, cliDestroy); + } } } @@ -1296,19 +1299,13 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) { return TSDB_CODE_RPC_ASYNC_IN_PROCESS; _exception1: - tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code)); - // taosMemoryFree(conn); // free conn later + tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, tstrerror(code)); + cliDestroyConn(conn, true); return code; _exception2: - // already registered to uv, callback handle error - tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code)); - // cliRmReqFromConn(conn, NULL); - - // cliResetConnTimer(conn); - // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - - // // taosMemoryFree(conn); + transUnrefCliHandle(conn); + tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, tstrerror(code)); return code; } @@ -1639,9 +1636,11 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) { } else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { // do nothing, notiy return; - } else { - /// ASSERT(code == 0); + } else if (code == 0) { (void)addConnToHeapCache(pThrd->connHeapCache, pConn); + } else { + // do nothing, notiy + return; } } code = cliHandleState_mayUpdateState(pThrd, pReq, pConn); @@ -2607,8 +2606,6 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); if (exh == NULL) { return NULL; - } else { - tDebug("onn %p got", exh->handle); } taosWLockLatch(&exh->latch); if (exh->pThrd == NULL && trans != NULL) {