Merge branch '3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-16 08:22:49 +08:00
parent 832358bd5e
commit e9d84f8131
2 changed files with 21 additions and 20 deletions

View File

@ -501,6 +501,10 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
code);
// called if drop task rsp received code
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
if (pMsg->handle == NULL) {
ASSERT(0);
}
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);

View File

@ -216,7 +216,7 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
static void destroyCliConnQTable(SCliConn* conn);
static void cliHandleBatch_shareConnExcept(SCliConn* conn);
static void cliHandleException(SCliConn* conn);
static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp);
@ -976,7 +976,7 @@ _failed:
taosMemoryFree(conn);
return code;
}
static void cliDestroyConn(SCliConn* conn, bool clear) { cliHandleBatch_shareConnExcept(conn); }
static void cliDestroyConn(SCliConn* conn, bool clear) { cliHandleException(conn); }
static void cliDestroy(uv_handle_t* handle) {
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
return;
@ -1021,7 +1021,7 @@ static void cliDestroy(uv_handle_t* handle) {
bool filterAllReq(void* e, void* arg) { return 1; }
static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
static void cliHandleException(SCliConn* conn) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
@ -1046,6 +1046,7 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
resp.msgType = pReq ? pReq->msg.msgType + 1 : 0;
resp.info.cliVer = pInst->compatibilityVer;
resp.info.ahandle = pCtx ? pCtx->ahandle : 0;
resp.info.handle = pReq->msg.info.handle;
if (pReq) {
resp.info.traceId = pReq->msg.info.traceId;
}
@ -1065,9 +1066,11 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
destroyReq(pReq);
}
}
int8_t ref = transGetRefCount(conn);
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
uv_close((uv_handle_t*)conn->stream, cliDestroy);
if (conn->registered) {
int8_t ref = transGetRefCount(conn);
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
}
}
@ -1296,19 +1299,13 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
_exception1:
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code));
// taosMemoryFree(conn); // free conn later
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, tstrerror(code));
cliDestroyConn(conn, true);
return code;
_exception2:
// already registered to uv, callback handle error
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code));
// cliRmReqFromConn(conn, NULL);
// cliResetConnTimer(conn);
// cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
// // taosMemoryFree(conn);
transUnrefCliHandle(conn);
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, tstrerror(code));
return code;
}
@ -1639,9 +1636,11 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
} else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
// do nothing, notiy
return;
} else {
/// ASSERT(code == 0);
} else if (code == 0) {
(void)addConnToHeapCache(pThrd->connHeapCache, pConn);
} else {
// do nothing, notiy
return;
}
}
code = cliHandleState_mayUpdateState(pThrd, pReq, pConn);
@ -2607,8 +2606,6 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh == NULL) {
return NULL;
} else {
tDebug("onn %p got", exh->handle);
}
taosWLockLatch(&exh->latch);
if (exh->pThrd == NULL && trans != NULL) {