diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2aeffc6395..560a501f2b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -254,6 +254,7 @@ static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later // handle except about conn +#define REQS_ON_CONN(conn) (conn ? (transQueueSize(&conn->reqsToSend) + transQueueSize(&conn->reqsSentOut)) : 0) static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code); // handle req from app static void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq); @@ -289,7 +290,7 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); -static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); +static int8_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn, SCliConn** pNewConn); // thread obj static int32_t createThrdObj(void* trans, SCliThrd** pThrd); @@ -327,14 +328,18 @@ typedef struct { int64_t lastConnFailTs; } SHeap; -int32_t compareHeapNode(const HeapNode* a, const HeapNode* b); -int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); -void transHeapDestroy(SHeap* heap); -int32_t transHeapGet(SHeap* heap, SCliConn** p); -int32_t transHeapInsert(SHeap* heap, SCliConn* p); -int32_t transHeapDelete(SHeap* heap, SCliConn* p); -int32_t transHeapBalance(SHeap* heap, SCliConn* p); -int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p); +static int32_t compareHeapNode(const HeapNode* a, const HeapNode* b); +static int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); +static void transHeapDestroy(SHeap* heap); + +static int32_t transHeapGet(SHeap* heap, SCliConn** p); +static int32_t transHeapInsert(SHeap* heap, SCliConn* p); +static int32_t transHeapDelete(SHeap* heap, SCliConn* p); +static int32_t transHeapBalance(SHeap* heap, SCliConn* p); +static int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p); +static int32_t transHeapMayBalance(SHeap* heap, SCliConn* p); + +static FORCE_INLINE void logConnMissHit(SCliConn* pConn); #define CLI_RELEASE_UV(loop) \ do { \ @@ -498,6 +503,8 @@ int8_t cliMayRecycleConn(SCliConn* conn) { if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { tDebug("%s conn %p failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); + + TAOS_UNUSED(transHeapMayBalance(conn->heap, conn)); return 1; } else { if (code != 0) { @@ -510,31 +517,10 @@ int8_t cliMayRecycleConn(SCliConn* conn) { } else if ((transQueueSize(&conn->reqsToSend) == 0) && (transQueueSize(&conn->reqsSentOut) == 0) && (taosHashGetSize(conn->pQTable) != 0)) { tDebug("%s conn %p do balance directly", CONN_GET_INST_LABEL(conn), conn); - TAOS_UNUSED(transHeapBalance(conn->heap, conn)); + TAOS_UNUSED(transHeapMayBalance(conn->heap, conn)); } else { - SCliConn* topConn = NULL; - if (conn->heap != NULL) { - code = transHeapGet(conn->heap, &topConn); - if (code != 0) { - tDebug("%s conn %p failed to get top conn since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); - return 0; - } - - if (topConn == conn) { - return 0; - } - int32_t topReqs = transQueueSize(&topConn->reqsSentOut) + transQueueSize(&topConn->reqsToSend); - int32_t currReqs = transQueueSize(&conn->reqsSentOut) + transQueueSize(&conn->reqsToSend); - if (topReqs <= currReqs) { - tTrace("%s conn %p not balance conn heap since top conn has less req, topConnReqs:%d, currConnReqs:%d", - CONN_GET_INST_LABEL(conn), conn, topReqs, currReqs); - return 0; - } else { - tDebug("%s conn %p do balance conn heap since top conn has more reqs, topConnReqs:%d, currConnReqs:%d", - CONN_GET_INST_LABEL(conn), conn, topReqs, currReqs); - TAOS_UNUSED(transHeapBalance(conn->heap, conn)); - } - } + tTrace("%s conn %p may do balance", CONN_GET_INST_LABEL(conn), conn); + TAOS_UNUSED(transHeapMayBalance(conn->heap, conn)); } return 0; } @@ -785,15 +771,8 @@ void cliConnCheckTimoutMsg(SCliConn* conn) { if (transQueueSize(&conn->reqsSentOut) == 0) { return; } - code = cliConnRemoveTimeoutMsg(conn); - if (code != 0) { - tDebug("%s conn %p do remove timeout msg", CONN_GET_INST_LABEL(conn), conn); - if (!cliMayRecycleConn(conn)) { - TAOS_UNUSED(transHeapBalance(conn->heap, conn)); - } - } else { - TAOS_UNUSED(cliMayRecycleConn(conn)); - } + TAOS_UNUSED(cliConnRemoveTimeoutMsg(conn)); + TAOS_UNUSED(cliMayRecycleConn(conn)); } void cliConnTimeout__checkReq(uv_timer_t* handle) { SCliConn* conn = handle->data; @@ -3804,6 +3783,8 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) { int32_t totalReqs = reqsNum + reqsSentOut; if (totalReqs >= pInst->shareConnLimit) { + logConnMissHit(pConn); + if (pConn->list == NULL && pConn->dstAddr != NULL) { pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr)); if (pConn->list != NULL) { @@ -3860,11 +3841,12 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { } else { tTrace("conn %p get conn from heap cache for key:%s", pConn, key); if (shouldSWitchToOtherConn(pConn, key)) { - code = balanceConnHeapCache(pConnHeapCache, pConn); - if (code != 0) { - tTrace("failed to balance conn heap cache for key:%s", key); + SCliConn* pNewConn = NULL; + code = balanceConnHeapCache(pConnHeapCache, pConn, &pNewConn); + if (code == 1) { + tTrace("conn %p start to handle reqs", key, pNewConn); + return pNewConn; } - logConnMissHit(pConn); return NULL; } } @@ -3916,15 +3898,19 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { return code; } -static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { +static int8_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn, SCliConn** pNewConn) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + SCliConn* pTopConn = NULL; if (pConn->heap != NULL && pConn->inHeap != 0) { - SHeap* heap = pConn->heap; - tTrace("conn %p'heap may should do balance, numOfConn:%d", pConn, (int)(heap->heap->nelts)); - int64_t now = taosGetTimestampMs(); - if (((now - heap->lastUpdateTs) / 1000) > 30) { - heap->lastUpdateTs = now; - tTrace("conn %p'heap do balance, numOfConn:%d", pConn, (int)(heap->heap->nelts)); - return transHeapBalance(pConn->heap, pConn); + TAOS_UNUSED(transHeapBalance(pConn->heap, pConn)); + if (transHeapGet(pConn->heap, &pTopConn) == 0 && pConn != pTopConn) { + int32_t curReqs = REQS_ON_CONN(pConn); + int32_t topReqs = REQS_ON_CONN(pTopConn); + if (curReqs > topReqs && topReqs < pInst->shareConnLimit) { + *pNewConn = pTopConn; + return 1; + } } } return 0; @@ -3934,8 +3920,8 @@ int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { SCliConn* args1 = container_of(a, SCliConn, node); SCliConn* args2 = container_of(b, SCliConn, node); - int32_t totalReq1 = transQueueSize(&args1->reqsToSend) + transQueueSize(&args1->reqsSentOut); - int32_t totalReq2 = transQueueSize(&args2->reqsToSend) + transQueueSize(&args2->reqsSentOut); + int32_t totalReq1 = REQS_ON_CONN(args1); + int32_t totalReq2 = REQS_ON_CONN(args2); if (totalReq1 > totalReq2) { return 0; } @@ -4016,6 +4002,30 @@ int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p) { heap->lastConnFailTs = taosGetTimestampMs(); return 0; } +int32_t transHeapMayBalance(SHeap* heap, SCliConn* p) { + if (p->inHeap == 0 || heap == NULL || heap->heap == NULL) { + return 0; + } + SCliThrd* pThrd = p->hostThrd; + STrans* pInst = pThrd->pInst; + int32_t balanceLimit = pInst->shareConnLimit >= 4 ? pInst->shareConnLimit / 2 : 2; + + SCliConn* topConn = NULL; + int32_t code = transHeapGet(heap, &topConn); + if (code != 0) { + return code; + } + + if (topConn == p) return code; + + int32_t reqsOnTop = REQS_ON_CONN(topConn); + int32_t reqsOnCur = REQS_ON_CONN(p); + + if (reqsOnTop >= balanceLimit && reqsOnCur < balanceLimit) { + TAOS_UNUSED(transHeapBalance(heap, p)); + } + return code; +} int32_t transHeapBalance(SHeap* heap, SCliConn* p) { if (p->inHeap == 0 || heap == NULL || heap->heap == NULL) {