fix: limit session num
This commit is contained in:
parent
663768a365
commit
4a196e74af
|
@ -635,12 +635,12 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||||
plist->list = nList;
|
plist->list = nList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STraceId* trace = &(*pMsg)->msg.info.traceId;
|
||||||
// no avaliable conn in pool
|
// no avaliable conn in pool
|
||||||
if (QUEUE_IS_EMPTY(&plist->conns)) {
|
if (QUEUE_IS_EMPTY(&plist->conns)) {
|
||||||
SMsgList* list = plist->list;
|
SMsgList* list = plist->list;
|
||||||
if ((list)->numOfConn >= pTransInst->connLimitNum) {
|
if ((list)->numOfConn >= pTransInst->connLimitNum) {
|
||||||
STraceId* trace = &(*pMsg)->msg.info.traceId;
|
STraceId* trace = &(*pMsg)->msg.info.traceId;
|
||||||
|
|
||||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
arg->param1 = *pMsg;
|
arg->param1 = *pMsg;
|
||||||
arg->param2 = pThrd;
|
arg->param2 = pThrd;
|
||||||
|
@ -657,6 +657,8 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||||
arg->param1 = *pMsg;
|
arg->param1 = *pMsg;
|
||||||
arg->param2 = pThrd;
|
arg->param2 = pThrd;
|
||||||
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
|
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
|
||||||
|
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label,
|
||||||
|
TMSG_INFO((*pMsg)->msg.msgType));
|
||||||
|
|
||||||
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
|
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
|
||||||
queue* h = QUEUE_HEAD(&(list)->msgQ);
|
queue* h = QUEUE_HEAD(&(list)->msgQ);
|
||||||
|
@ -664,6 +666,9 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||||
SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);
|
SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);
|
||||||
|
|
||||||
*pMsg = ans;
|
*pMsg = ans;
|
||||||
|
|
||||||
|
trace = &(*pMsg)->msg.info.traceId;
|
||||||
|
tGTrace("%s msg %s pop from delay queue, start to send", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
|
||||||
transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
|
transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
|
||||||
}
|
}
|
||||||
list->numOfConn++;
|
list->numOfConn++;
|
||||||
|
@ -860,6 +865,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
if (conn->list != NULL) {
|
if (conn->list != NULL) {
|
||||||
SConnList* connList = conn->list;
|
SConnList* connList = conn->list;
|
||||||
connList->list->numOfConn--;
|
connList->list->numOfConn--;
|
||||||
|
connList->size--;
|
||||||
} else {
|
} else {
|
||||||
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
|
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
|
||||||
connList->list->numOfConn--;
|
connList->list->numOfConn--;
|
||||||
|
|
Loading…
Reference in New Issue