From f27ff536eb644ecaaac72933331c033278b8c627 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 11 Oct 2024 18:11:45 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/3.0' into enh/opt-transport --- source/libs/transport/src/transCli.c | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index aed1e9339a..53dd5291cb 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -316,6 +316,7 @@ typedef struct { Heap* heap; int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b); int64_t lastUpdateTs; + int64_t lastConnFailTs; } SHeap; int32_t compareHeapNode(const HeapNode* a, const HeapNode* b); @@ -325,6 +326,7 @@ int32_t transHeapGet(SHeap* heap, SCliConn** p); int32_t transHeapInsert(SHeap* heap, SCliConn* p); int32_t transHeapDelete(SHeap* heap, SCliConn* p); int32_t transHeapBalance(SHeap* heap, SCliConn* p); +int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p); #define CLI_RELEASE_UV(loop) \ do { \ @@ -978,7 +980,9 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) code = cliHandleState_mayUpdateState(pConn, pReq); code = addConnToHeapCache(pThrd->connHeapCache, pConn); - if (code != 0) { + // code = 0, succ + // code = 0, + if (code != 0 && code != TSDB_CODE_RPC_NETWORK_UNAVAIL) { TAOS_CHECK_GOTO(code, NULL, _exception); } transQueuePush(&pConn->reqsToSend, &pReq->q); @@ -1527,6 +1531,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) { ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { tError("failed connect to %s since %s", conn->dstAddr, uv_err_name(ret)); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1); } @@ -3767,6 +3772,12 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { if (code != 0) { return code; } + if (pConn->connnected == 0) { + int64_t now = taosGetTimestampMs(); + if (now - p->lastConnFailTs < 3000) { + return TSDB_CODE_RPC_NETWORK_UNAVAIL; + } + } } code = transHeapInsert(p, pConn); @@ -3860,6 +3871,9 @@ int32_t transHeapInsert(SHeap* heap, SCliConn* p) { } int32_t transHeapDelete(SHeap* heap, SCliConn* p) { // impl later + if (p->connnected == 0) { + transHeapUpdateFailTs(heap, p); + } if (p->inHeap == 0) { tDebug("failed to del conn %p since not in heap", p); return 0; @@ -3877,6 +3891,11 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) { return 0; } +int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p) { + heap->lastConnFailTs = taosGetTimestampMs(); + return 0; +} + int32_t transHeapBalance(SHeap* heap, SCliConn* p) { if (p->inHeap == 0 && heap == NULL || heap->heap == NULL) { return 0;