opt get conn rule

This commit is contained in:
yihaoDeng 2024-11-10 16:50:36 +08:00
parent f7101a346a
commit 1a433666eb
1 changed files with 66 additions and 56 deletions

View File

@ -254,6 +254,7 @@ static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
// process data read from server, add decompress etc later // process data read from server, add decompress etc later
// handle except about conn // handle except about conn
#define REQS_ON_CONN(conn) (conn ? (transQueueSize(&conn->reqsToSend) + transQueueSize(&conn->reqsSentOut)) : 0)
static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code); static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code);
// handle req from app // handle req from app
static void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq); static void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq);
@ -289,7 +290,7 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key);
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn);
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn);
static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); static int8_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn, SCliConn** pNewConn);
// thread obj // thread obj
static int32_t createThrdObj(void* trans, SCliThrd** pThrd); static int32_t createThrdObj(void* trans, SCliThrd** pThrd);
@ -327,14 +328,18 @@ typedef struct {
int64_t lastConnFailTs; int64_t lastConnFailTs;
} SHeap; } SHeap;
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b); static int32_t compareHeapNode(const HeapNode* a, const HeapNode* b);
int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); static int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b));
void transHeapDestroy(SHeap* heap); static void transHeapDestroy(SHeap* heap);
int32_t transHeapGet(SHeap* heap, SCliConn** p);
int32_t transHeapInsert(SHeap* heap, SCliConn* p); static int32_t transHeapGet(SHeap* heap, SCliConn** p);
int32_t transHeapDelete(SHeap* heap, SCliConn* p); static int32_t transHeapInsert(SHeap* heap, SCliConn* p);
int32_t transHeapBalance(SHeap* heap, SCliConn* p); static int32_t transHeapDelete(SHeap* heap, SCliConn* p);
int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p); static int32_t transHeapBalance(SHeap* heap, SCliConn* p);
static int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p);
static int32_t transHeapMayBalance(SHeap* heap, SCliConn* p);
static FORCE_INLINE void logConnMissHit(SCliConn* pConn);
#define CLI_RELEASE_UV(loop) \ #define CLI_RELEASE_UV(loop) \
do { \ do { \
@ -498,6 +503,8 @@ int8_t cliMayRecycleConn(SCliConn* conn) {
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
tDebug("%s conn %p failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn, tDebug("%s conn %p failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn,
tstrerror(code)); tstrerror(code));
TAOS_UNUSED(transHeapMayBalance(conn->heap, conn));
return 1; return 1;
} else { } else {
if (code != 0) { if (code != 0) {
@ -510,31 +517,10 @@ int8_t cliMayRecycleConn(SCliConn* conn) {
} else if ((transQueueSize(&conn->reqsToSend) == 0) && (transQueueSize(&conn->reqsSentOut) == 0) && } else if ((transQueueSize(&conn->reqsToSend) == 0) && (transQueueSize(&conn->reqsSentOut) == 0) &&
(taosHashGetSize(conn->pQTable) != 0)) { (taosHashGetSize(conn->pQTable) != 0)) {
tDebug("%s conn %p do balance directly", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p do balance directly", CONN_GET_INST_LABEL(conn), conn);
TAOS_UNUSED(transHeapBalance(conn->heap, conn)); TAOS_UNUSED(transHeapMayBalance(conn->heap, conn));
} else { } else {
SCliConn* topConn = NULL; tTrace("%s conn %p may do balance", CONN_GET_INST_LABEL(conn), conn);
if (conn->heap != NULL) { TAOS_UNUSED(transHeapMayBalance(conn->heap, conn));
code = transHeapGet(conn->heap, &topConn);
if (code != 0) {
tDebug("%s conn %p failed to get top conn since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
return 0;
}
if (topConn == conn) {
return 0;
}
int32_t topReqs = transQueueSize(&topConn->reqsSentOut) + transQueueSize(&topConn->reqsToSend);
int32_t currReqs = transQueueSize(&conn->reqsSentOut) + transQueueSize(&conn->reqsToSend);
if (topReqs <= currReqs) {
tTrace("%s conn %p not balance conn heap since top conn has less req, topConnReqs:%d, currConnReqs:%d",
CONN_GET_INST_LABEL(conn), conn, topReqs, currReqs);
return 0;
} else {
tDebug("%s conn %p do balance conn heap since top conn has more reqs, topConnReqs:%d, currConnReqs:%d",
CONN_GET_INST_LABEL(conn), conn, topReqs, currReqs);
TAOS_UNUSED(transHeapBalance(conn->heap, conn));
}
}
} }
return 0; return 0;
} }
@ -785,15 +771,8 @@ void cliConnCheckTimoutMsg(SCliConn* conn) {
if (transQueueSize(&conn->reqsSentOut) == 0) { if (transQueueSize(&conn->reqsSentOut) == 0) {
return; return;
} }
code = cliConnRemoveTimeoutMsg(conn); TAOS_UNUSED(cliConnRemoveTimeoutMsg(conn));
if (code != 0) { TAOS_UNUSED(cliMayRecycleConn(conn));
tDebug("%s conn %p do remove timeout msg", CONN_GET_INST_LABEL(conn), conn);
if (!cliMayRecycleConn(conn)) {
TAOS_UNUSED(transHeapBalance(conn->heap, conn));
}
} else {
TAOS_UNUSED(cliMayRecycleConn(conn));
}
} }
void cliConnTimeout__checkReq(uv_timer_t* handle) { void cliConnTimeout__checkReq(uv_timer_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
@ -3804,6 +3783,8 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) {
int32_t totalReqs = reqsNum + reqsSentOut; int32_t totalReqs = reqsNum + reqsSentOut;
if (totalReqs >= pInst->shareConnLimit) { if (totalReqs >= pInst->shareConnLimit) {
logConnMissHit(pConn);
if (pConn->list == NULL && pConn->dstAddr != NULL) { if (pConn->list == NULL && pConn->dstAddr != NULL) {
pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr)); pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr));
if (pConn->list != NULL) { if (pConn->list != NULL) {
@ -3860,11 +3841,12 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
} else { } else {
tTrace("conn %p get conn from heap cache for key:%s", pConn, key); tTrace("conn %p get conn from heap cache for key:%s", pConn, key);
if (shouldSWitchToOtherConn(pConn, key)) { if (shouldSWitchToOtherConn(pConn, key)) {
code = balanceConnHeapCache(pConnHeapCache, pConn); SCliConn* pNewConn = NULL;
if (code != 0) { code = balanceConnHeapCache(pConnHeapCache, pConn, &pNewConn);
tTrace("failed to balance conn heap cache for key:%s", key); if (code == 1) {
tTrace("conn %p start to handle reqs", key, pNewConn);
return pNewConn;
} }
logConnMissHit(pConn);
return NULL; return NULL;
} }
} }
@ -3916,15 +3898,19 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
return code; return code;
} }
static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { static int8_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn, SCliConn** pNewConn) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
SCliConn* pTopConn = NULL;
if (pConn->heap != NULL && pConn->inHeap != 0) { if (pConn->heap != NULL && pConn->inHeap != 0) {
SHeap* heap = pConn->heap; TAOS_UNUSED(transHeapBalance(pConn->heap, pConn));
tTrace("conn %p'heap may should do balance, numOfConn:%d", pConn, (int)(heap->heap->nelts)); if (transHeapGet(pConn->heap, &pTopConn) == 0 && pConn != pTopConn) {
int64_t now = taosGetTimestampMs(); int32_t curReqs = REQS_ON_CONN(pConn);
if (((now - heap->lastUpdateTs) / 1000) > 30) { int32_t topReqs = REQS_ON_CONN(pTopConn);
heap->lastUpdateTs = now; if (curReqs > topReqs && topReqs < pInst->shareConnLimit) {
tTrace("conn %p'heap do balance, numOfConn:%d", pConn, (int)(heap->heap->nelts)); *pNewConn = pTopConn;
return transHeapBalance(pConn->heap, pConn); return 1;
}
} }
} }
return 0; return 0;
@ -3934,8 +3920,8 @@ int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) {
SCliConn* args1 = container_of(a, SCliConn, node); SCliConn* args1 = container_of(a, SCliConn, node);
SCliConn* args2 = container_of(b, SCliConn, node); SCliConn* args2 = container_of(b, SCliConn, node);
int32_t totalReq1 = transQueueSize(&args1->reqsToSend) + transQueueSize(&args1->reqsSentOut); int32_t totalReq1 = REQS_ON_CONN(args1);
int32_t totalReq2 = transQueueSize(&args2->reqsToSend) + transQueueSize(&args2->reqsSentOut); int32_t totalReq2 = REQS_ON_CONN(args2);
if (totalReq1 > totalReq2) { if (totalReq1 > totalReq2) {
return 0; return 0;
} }
@ -4016,6 +4002,30 @@ int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p) {
heap->lastConnFailTs = taosGetTimestampMs(); heap->lastConnFailTs = taosGetTimestampMs();
return 0; return 0;
} }
int32_t transHeapMayBalance(SHeap* heap, SCliConn* p) {
if (p->inHeap == 0 || heap == NULL || heap->heap == NULL) {
return 0;
}
SCliThrd* pThrd = p->hostThrd;
STrans* pInst = pThrd->pInst;
int32_t balanceLimit = pInst->shareConnLimit >= 4 ? pInst->shareConnLimit / 2 : 2;
SCliConn* topConn = NULL;
int32_t code = transHeapGet(heap, &topConn);
if (code != 0) {
return code;
}
if (topConn == p) return code;
int32_t reqsOnTop = REQS_ON_CONN(topConn);
int32_t reqsOnCur = REQS_ON_CONN(p);
if (reqsOnTop >= balanceLimit && reqsOnCur < balanceLimit) {
TAOS_UNUSED(transHeapBalance(heap, p));
}
return code;
}
int32_t transHeapBalance(SHeap* heap, SCliConn* p) { int32_t transHeapBalance(SHeap* heap, SCliConn* p) {
if (p->inHeap == 0 || heap == NULL || heap->heap == NULL) { if (p->inHeap == 0 || heap == NULL || heap->heap == NULL) {