Merge remote-tracking branch 'origin/3.0' into enh/opt-transport
This commit is contained in:
parent
331235a828
commit
437f9d64b4
|
@ -3680,7 +3680,7 @@ static int8_t cliConnRemoveTimeoutMsg(SCliConn* pConn) {
|
||||||
if (QUEUE_IS_EMPTY(&set)) {
|
if (QUEUE_IS_EMPTY(&set)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
tDebug("%s conn %p do remove timeout msg", pInst->label, pConn);
|
tWarn("%s conn %p do remove timeout msg", pInst->label, pConn);
|
||||||
destroyReqInQueue(pConn, &set, TSDB_CODE_RPC_TIMEOUT);
|
destroyReqInQueue(pConn, &set, TSDB_CODE_RPC_TIMEOUT);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -3698,13 +3698,13 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) {
|
||||||
if (pConn->list == NULL && pConn->dstAddr != NULL) {
|
if (pConn->list == NULL && pConn->dstAddr != NULL) {
|
||||||
pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr));
|
pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr));
|
||||||
if (pConn->list != NULL) {
|
if (pConn->list != NULL) {
|
||||||
tDebug("conn %p get list %p from pool for key:%s", pConn, pConn->list, key);
|
tTrace("conn %p get list %p from pool for key:%s", pConn, pConn->list, key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) {
|
if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) {
|
||||||
tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn);
|
tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn);
|
||||||
if (cliConnRemoveTimeoutMsg(pConn)) {
|
if (cliConnRemoveTimeoutMsg(pConn)) {
|
||||||
tDebug("%s conn %p succ to remove timeout msg", transLabel(pInst), pConn);
|
tWarn("%s conn %p succ to remove timeout msg", transLabel(pInst), pConn);
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -3740,18 +3740,18 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
|
||||||
SCliConn* pConn = NULL;
|
SCliConn* pConn = NULL;
|
||||||
code = getOrCreateHeap(pConnHeapCache, key, &pHeap);
|
code = getOrCreateHeap(pConnHeapCache, key, &pHeap);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tDebug("failed to get conn heap from cache for key:%s", key);
|
tTrace("failed to get conn heap from cache for key:%s", key);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
code = transHeapGet(pHeap, &pConn);
|
code = transHeapGet(pHeap, &pConn);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tDebug("failed to get conn from heap cache for key:%s", key);
|
tTrace("failed to get conn from heap cache for key:%s", key);
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
if (shouldSWitchToOtherConn(pConn, key)) {
|
if (shouldSWitchToOtherConn(pConn, key)) {
|
||||||
code = balanceConnHeapCache(pConnHeapCache, pConn);
|
code = balanceConnHeapCache(pConnHeapCache, pConn);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tDebug("failed to balance conn heap cache for key:%s", key);
|
tTrace("failed to balance conn heap cache for key:%s", key);
|
||||||
}
|
}
|
||||||
logConnMissHit(pConn);
|
logConnMissHit(pConn);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -3766,7 +3766,7 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
|
||||||
|
|
||||||
if (pConn->heap != NULL) {
|
if (pConn->heap != NULL) {
|
||||||
p = pConn->heap;
|
p = pConn->heap;
|
||||||
tDebug("conn %p add to heap cache for key:%s,status:%d, refCnt:%d, add direct", pConn, pConn->dstAddr,
|
tTrace("conn %p add to heap cache for key:%s,status:%d, refCnt:%d, add direct", pConn, pConn->dstAddr,
|
||||||
pConn->inHeap, pConn->reqRefCnt);
|
pConn->inHeap, pConn->reqRefCnt);
|
||||||
} else {
|
} else {
|
||||||
code = getOrCreateHeap(pConnHeapCacahe, pConn->dstAddr, &p);
|
code = getOrCreateHeap(pConnHeapCacahe, pConn->dstAddr, &p);
|
||||||
|
@ -3782,25 +3782,25 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
code = transHeapInsert(p, pConn);
|
code = transHeapInsert(p, pConn);
|
||||||
tDebug("conn %p add to heap cache for key:%s,status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap,
|
tTrace("conn %p add to heap cache for key:%s,status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap,
|
||||||
pConn->reqRefCnt);
|
pConn->reqRefCnt);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
|
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
|
||||||
if (pConn->heap != NULL) {
|
if (pConn->heap != NULL) {
|
||||||
tDebug("conn %p delete from heap cache direct", pConn);
|
tTrace("conn %p try to delete from heap cache direct", pConn);
|
||||||
return transHeapDelete(pConn->heap, pConn);
|
return transHeapDelete(pConn->heap, pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr));
|
SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr);
|
tTrace("failed to get heap cache for key:%s, no need to del", pConn->dstAddr);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t code = transHeapDelete(p, pConn);
|
int32_t code = transHeapDelete(p, pConn);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tDebug("conn %p failed delete from heap cache since %s", pConn, tstrerror(code));
|
tTrace("conn %p failed delete from heap cache since %s", pConn, tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3861,7 +3861,7 @@ int32_t transHeapInsert(SHeap* heap, SCliConn* p) {
|
||||||
// impl later
|
// impl later
|
||||||
p->reqRefCnt++;
|
p->reqRefCnt++;
|
||||||
if (p->inHeap == 1) {
|
if (p->inHeap == 1) {
|
||||||
tDebug("failed to insert conn %p since already in heap", p);
|
tTrace("failed to insert conn %p since already in heap", p);
|
||||||
return TSDB_CODE_DUP_KEY;
|
return TSDB_CODE_DUP_KEY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3877,12 +3877,12 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
|
||||||
transHeapUpdateFailTs(heap, p);
|
transHeapUpdateFailTs(heap, p);
|
||||||
}
|
}
|
||||||
if (p->inHeap == 0) {
|
if (p->inHeap == 0) {
|
||||||
tDebug("failed to del conn %p since not in heap", p);
|
tTrace("failed to del conn %p since not in heap", p);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
if (now - p->lastAddHeapTime < 10000) {
|
if (now - p->lastAddHeapTime < 10000) {
|
||||||
tDebug("conn %p not added to heap frequently", p);
|
tTrace("conn %p not added/delete to heap frequently", p);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3891,11 +3891,11 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
|
||||||
p->reqRefCnt--;
|
p->reqRefCnt--;
|
||||||
if (p->reqRefCnt == 0) {
|
if (p->reqRefCnt == 0) {
|
||||||
heapRemove(heap->heap, &p->node);
|
heapRemove(heap->heap, &p->node);
|
||||||
tDebug("conn %p delete from heap", p);
|
tTrace("conn %p delete from heap", p);
|
||||||
} else if (p->reqRefCnt < 0) {
|
} else if (p->reqRefCnt < 0) {
|
||||||
tDebug("conn %p has %d reqs, not delete from heap,assert", p, p->reqRefCnt);
|
tTrace("conn %p has %d reqs, not delete from heap,assert", p, p->reqRefCnt);
|
||||||
} else {
|
} else {
|
||||||
tDebug("conn %p has %d reqs, not delete from heap", p, p->reqRefCnt);
|
tTrace("conn %p has %d reqs, not delete from heap", p, p->reqRefCnt);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue