Merge remote-tracking branch 'origin/3.0' into enh/opt-transport
This commit is contained in:
parent
731f9b3fb7
commit
f27ff536eb
|
@ -316,6 +316,7 @@ typedef struct {
|
||||||
Heap* heap;
|
Heap* heap;
|
||||||
int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b);
|
int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b);
|
||||||
int64_t lastUpdateTs;
|
int64_t lastUpdateTs;
|
||||||
|
int64_t lastConnFailTs;
|
||||||
} SHeap;
|
} SHeap;
|
||||||
|
|
||||||
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b);
|
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b);
|
||||||
|
@ -325,6 +326,7 @@ int32_t transHeapGet(SHeap* heap, SCliConn** p);
|
||||||
int32_t transHeapInsert(SHeap* heap, SCliConn* p);
|
int32_t transHeapInsert(SHeap* heap, SCliConn* p);
|
||||||
int32_t transHeapDelete(SHeap* heap, SCliConn* p);
|
int32_t transHeapDelete(SHeap* heap, SCliConn* p);
|
||||||
int32_t transHeapBalance(SHeap* heap, SCliConn* p);
|
int32_t transHeapBalance(SHeap* heap, SCliConn* p);
|
||||||
|
int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p);
|
||||||
|
|
||||||
#define CLI_RELEASE_UV(loop) \
|
#define CLI_RELEASE_UV(loop) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -978,7 +980,9 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
|
||||||
code = cliHandleState_mayUpdateState(pConn, pReq);
|
code = cliHandleState_mayUpdateState(pConn, pReq);
|
||||||
|
|
||||||
code = addConnToHeapCache(pThrd->connHeapCache, pConn);
|
code = addConnToHeapCache(pThrd->connHeapCache, pConn);
|
||||||
if (code != 0) {
|
// code = 0, succ
|
||||||
|
// code = 0,
|
||||||
|
if (code != 0 && code != TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
TAOS_CHECK_GOTO(code, NULL, _exception);
|
TAOS_CHECK_GOTO(code, NULL, _exception);
|
||||||
}
|
}
|
||||||
transQueuePush(&pConn->reqsToSend, &pReq->q);
|
transQueuePush(&pConn->reqsToSend, &pReq->q);
|
||||||
|
@ -1527,6 +1531,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
|
||||||
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("failed connect to %s since %s", conn->dstAddr, uv_err_name(ret));
|
tError("failed connect to %s since %s", conn->dstAddr, uv_err_name(ret));
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3767,6 +3772,12 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
if (pConn->connnected == 0) {
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
if (now - p->lastConnFailTs < 3000) {
|
||||||
|
return TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = transHeapInsert(p, pConn);
|
code = transHeapInsert(p, pConn);
|
||||||
|
@ -3860,6 +3871,9 @@ int32_t transHeapInsert(SHeap* heap, SCliConn* p) {
|
||||||
}
|
}
|
||||||
int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
|
int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
|
||||||
// impl later
|
// impl later
|
||||||
|
if (p->connnected == 0) {
|
||||||
|
transHeapUpdateFailTs(heap, p);
|
||||||
|
}
|
||||||
if (p->inHeap == 0) {
|
if (p->inHeap == 0) {
|
||||||
tDebug("failed to del conn %p since not in heap", p);
|
tDebug("failed to del conn %p since not in heap", p);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -3877,6 +3891,11 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p) {
|
||||||
|
heap->lastConnFailTs = taosGetTimestampMs();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t transHeapBalance(SHeap* heap, SCliConn* p) {
|
int32_t transHeapBalance(SHeap* heap, SCliConn* p) {
|
||||||
if (p->inHeap == 0 && heap == NULL || heap->heap == NULL) {
|
if (p->inHeap == 0 && heap == NULL || heap->heap == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue