Merge remote-tracking branch 'origin/3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-26 22:02:58 +08:00
parent ce052c9c8a
commit f534731cd9
1 changed files with 1 additions and 46 deletions

View File

@ -91,7 +91,6 @@ typedef struct SCliConn {
char* ipStr; char* ipStr;
int32_t port; int32_t port;
int64_t refId;
int64_t seq; int64_t seq;
int8_t registered; int8_t registered;
@ -216,8 +215,7 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
static void destroyCliConnQTable(SCliConn* conn); static void destroyCliConnQTable(SCliConn* conn);
static void cliHandleException(SCliConn* conn); static void cliHandleException(SCliConn* conn);
static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp); static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp);
void cliResetConnTimer(SCliConn* conn); void cliResetConnTimer(SCliConn* conn);
@ -852,41 +850,6 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pInst->idleTime)); conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pInst->idleTime));
} }
} }
static int32_t allocConnRef(SCliConn* conn, bool update) {
if (update) {
TAOS_UNUSED(transReleaseExHandle(transGetRefMgt(), conn->refId));
TAOS_UNUSED(transRemoveExHandle(transGetRefMgt(), conn->refId));
conn->refId = -1;
}
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
if (exh == NULL) {
return terrno;
}
exh->refId = transAddExHandle(transGetRefMgt(), exh);
if (exh->refId < 0) {
taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID;
}
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);
exh->handle = conn;
exh->pThrd = conn->hostThrd;
SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId);
if (self != exh) {
taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID;
}
conn->refId = exh->refId;
if (conn->refId < 0) {
taosMemoryFree(exh);
}
return 0;
}
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
@ -997,7 +960,6 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
if (conn->pQTable == NULL) { if (conn->pQTable == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _failed); TAOS_CHECK_GOTO(terrno, NULL, _failed);
} }
TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed);
TAOS_CHECK_GOTO(cliGetConnTimer(pThrd, conn), &lino, _failed); TAOS_CHECK_GOTO(cliGetConnTimer(pThrd, conn), &lino, _failed);
@ -1035,9 +997,6 @@ _failed:
transQueueDestroy(&conn->reqsToSend); transQueueDestroy(&conn->reqsToSend);
transQueueDestroy(&conn->reqsSentOut); transQueueDestroy(&conn->reqsSentOut);
taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->dstAddr);
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
} }
tError("failed to create conn, code:%d", code); tError("failed to create conn, code:%d", code);
taosMemoryFree(conn); taosMemoryFree(conn);
@ -1054,10 +1013,6 @@ static void cliDestroy(uv_handle_t* handle) {
(void)destroyAllReqs(conn); (void)destroyAllReqs(conn);
if (conn->refId > 0) {
TAOS_UNUSED(transReleaseExHandle(transGetRefMgt(), conn->refId));
TAOS_UNUSED(transRemoveExHandle(transGetRefMgt(), conn->refId));
}
(void)delConnFromHeapCache(pThrd->connHeapCache, conn); (void)delConnFromHeapCache(pThrd->connHeapCache, conn);
taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->stream); taosMemoryFree(conn->stream);