fix: limit session num
This commit is contained in:
parent
0993813213
commit
0bbd729995
|
@ -76,6 +76,10 @@ typedef struct SCliConn {
|
||||||
|
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t numOfConn;
|
||||||
|
queue msgQ;
|
||||||
|
} SMsgList;
|
||||||
typedef struct SCliMsg {
|
typedef struct SCliMsg {
|
||||||
STransConnCtx* ctx;
|
STransConnCtx* ctx;
|
||||||
STransMsg msg;
|
STransMsg msg;
|
||||||
|
@ -136,7 +140,7 @@ typedef struct {
|
||||||
// add expire timeout and capacity limit
|
// add expire timeout and capacity limit
|
||||||
static void* createConnPool(int size);
|
static void* createConnPool(int size);
|
||||||
static void* destroyConnPool(void* pool);
|
static void* destroyConnPool(void* pool);
|
||||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
static SCliConn* getConnFromPool(void* pool, char* addr);
|
||||||
static void addConnToPool(void* pool, SCliConn* conn);
|
static void addConnToPool(void* pool, SCliConn* conn);
|
||||||
static void doCloseIdleConn(void* param);
|
static void doCloseIdleConn(void* param);
|
||||||
|
|
||||||
|
@ -176,7 +180,8 @@ static void cliSend(SCliConn* pConn);
|
||||||
static void cliSendBatch(SCliConn* pConn);
|
static void cliSendBatch(SCliConn* pConn);
|
||||||
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
||||||
|
|
||||||
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port);
|
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr);
|
||||||
|
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg);
|
||||||
|
|
||||||
// cli util func
|
// cli util func
|
||||||
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
|
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
|
||||||
|
@ -556,10 +561,7 @@ void* destroyConnPool(void* pool) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
static SCliConn* getConnFromPool(void* pool, char* key) {
|
||||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
|
||||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
|
||||||
|
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||||
if (plist == NULL) {
|
if (plist == NULL) {
|
||||||
SConnList list = {0};
|
SConnList list = {0};
|
||||||
|
@ -607,6 +609,20 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
|
|
||||||
conn->status = ConnInPool;
|
conn->status = ConnInPool;
|
||||||
|
|
||||||
|
SMsgList** msglist = taosHashGet(thrd->connLimitCache, conn->ip, strlen(conn->ip));
|
||||||
|
if (msglist != NULL && *msglist != NULL) {
|
||||||
|
if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) {
|
||||||
|
queue* h = QUEUE_HEAD(&(*msglist)->msgQ);
|
||||||
|
QUEUE_REMOVE(h);
|
||||||
|
|
||||||
|
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||||
|
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
|
||||||
|
transQueuePush(&conn->cliMsgs, pMsg);
|
||||||
|
cliSend(conn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (conn->list == NULL) {
|
if (conn->list == NULL) {
|
||||||
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
||||||
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
|
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
|
||||||
|
@ -774,9 +790,10 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
conn->timer->data = NULL;
|
conn->timer->data = NULL;
|
||||||
conn->timer = NULL;
|
conn->timer = NULL;
|
||||||
}
|
}
|
||||||
int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
|
SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
|
||||||
int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1;
|
if (list != NULL && *list != NULL) {
|
||||||
taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal));
|
(*list)->numOfConn--;
|
||||||
|
}
|
||||||
|
|
||||||
atomic_sub_fetch_32(&pThrd->connCount, 1);
|
atomic_sub_fetch_32(&pThrd->connCount, 1);
|
||||||
|
|
||||||
|
@ -1009,9 +1026,12 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
SCliBatchList* pList = pBatch->pList;
|
SCliBatchList* pList = pBatch->pList;
|
||||||
|
|
||||||
SCliConn* conn = getConnFromPool(pThrd->pool, pList->ip, pList->port);
|
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||||
|
CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);
|
||||||
|
|
||||||
if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pList->ip, pList->port)) {
|
SCliConn* conn = getConnFromPool(pThrd->pool, key);
|
||||||
|
|
||||||
|
if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, key)) {
|
||||||
tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen,
|
tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen,
|
||||||
pBatch->batchSize);
|
pBatch->batchSize);
|
||||||
cliDestroyBatch(pBatch);
|
cliDestroyBatch(pBatch);
|
||||||
|
@ -1067,6 +1087,14 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
||||||
cliHandleFastFail(conn, -1);
|
cliHandleFastFail(conn, -1);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
|
||||||
|
if (list == NULL || *list == NULL) {
|
||||||
|
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||||
|
nList->numOfConn++;
|
||||||
|
QUEUE_INIT(&nList->msgQ);
|
||||||
|
taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*));
|
||||||
|
}
|
||||||
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1173,10 +1201,6 @@ void cliConnCb(uv_connect_t* req, int status) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip));
|
|
||||||
int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1;
|
|
||||||
taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal));
|
|
||||||
|
|
||||||
struct sockaddr peername, sockname;
|
struct sockaddr peername, sockname;
|
||||||
int addrlen = sizeof(peername);
|
int addrlen = sizeof(peername);
|
||||||
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
|
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
|
||||||
|
@ -1236,7 +1260,7 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
|
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
SCliConn* conn = NULL;
|
SCliConn* conn = NULL;
|
||||||
|
|
||||||
|
@ -1250,7 +1274,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
|
||||||
} else {
|
} else {
|
||||||
conn = exh->handle;
|
conn = exh->handle;
|
||||||
if (conn == NULL) {
|
if (conn == NULL) {
|
||||||
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
|
conn = getConnFromPool(pThrd->pool, addr);
|
||||||
if (conn != NULL) specifyConnRef(conn, true, refId);
|
if (conn != NULL) specifyConnRef(conn, true, refId);
|
||||||
}
|
}
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
@ -1258,7 +1282,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
|
||||||
return conn;
|
return conn;
|
||||||
};
|
};
|
||||||
|
|
||||||
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
|
conn = getConnFromPool(pThrd->pool, addr);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
|
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1316,20 +1340,30 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) {
|
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr) {
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
// STransConnCtx* pCtx = pMsg->ctx;
|
SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr));
|
||||||
// char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
if (list == NULL || *list == NULL) {
|
||||||
// int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
if ((*list)->numOfConn >= pTransInst->connLimitNum) {
|
||||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
|
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) {
|
||||||
if (val == NULL) return 0;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
if (*val >= pTransInst->connLimitNum) {
|
SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr));
|
||||||
|
if (list == NULL || *list == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*list)->numOfConn >= pTransInst->connLimitNum) {
|
||||||
|
QUEUE_PUSH(&(*list)->msgQ, &pMsg->q);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1337,36 +1371,22 @@ static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port)
|
||||||
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
|
|
||||||
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
||||||
STraceId* trace = &pMsg->msg.info.traceId;
|
|
||||||
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
|
||||||
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
|
||||||
|
|
||||||
if (!EPSET_IS_VALID(&pCtx->epSet)) {
|
if (!EPSET_IS_VALID(&pCtx->epSet)) {
|
||||||
tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
|
tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
||||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
char addr[TSDB_FQDN_LEN + 64] = {0};
|
||||||
|
CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
|
||||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
|
|
||||||
if (item != NULL) {
|
|
||||||
int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp);
|
|
||||||
if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) {
|
|
||||||
tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label,
|
|
||||||
TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse);
|
|
||||||
destroyCmsg(pMsg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ignore = false;
|
bool ignore = false;
|
||||||
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
|
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore, addr);
|
||||||
if (ignore == true) {
|
if (ignore == true) {
|
||||||
// persist conn already release by server
|
// persist conn already release by server
|
||||||
STransMsg resp;
|
STransMsg resp;
|
||||||
|
@ -1377,11 +1397,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (conn == NULL && cliPreCheckSessionLimitForMsg(pThrd, addr, pMsg) != 0) {
|
||||||
if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, ip, port)) {
|
|
||||||
tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType),
|
|
||||||
tstrerror(TSDB_CODE_RPC_MAX_SESSIONS));
|
|
||||||
destroyCmsg(pMsg);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1398,13 +1414,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||||
transQueuePush(&conn->cliMsgs, pMsg);
|
transQueuePush(&conn->cliMsgs, pMsg);
|
||||||
|
|
||||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
conn->ip = strdup(addr);
|
||||||
char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
|
||||||
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
|
||||||
CONN_CONSTRUCT_HASH_KEY(key, fqdn, port);
|
|
||||||
|
|
||||||
conn->ip = strdup(key);
|
|
||||||
|
|
||||||
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
|
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
|
||||||
if (ipaddr == 0xffffffff) {
|
if (ipaddr == 0xffffffff) {
|
||||||
uv_timer_stop(conn->timer);
|
uv_timer_stop(conn->timer);
|
||||||
|
@ -1453,6 +1463,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
cliHandleFastFail(conn, ret);
|
cliHandleFastFail(conn, ret);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
|
||||||
|
if (list == NULL || *list == NULL) {
|
||||||
|
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||||
|
nList->numOfConn++;
|
||||||
|
QUEUE_INIT(&nList->msgQ);
|
||||||
|
taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*));
|
||||||
|
}
|
||||||
|
|
||||||
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
||||||
}
|
}
|
||||||
tGTrace("%s conn %p ready", pTransInst->label, conn);
|
tGTrace("%s conn %p ready", pTransInst->label, conn);
|
||||||
|
@ -1833,8 +1852,7 @@ static SCliThrd* createThrdObj(void* trans) {
|
||||||
pThrd->destroyAhandleFp = pTransInst->destroyFp;
|
pThrd->destroyAhandleFp = pTransInst->destroyFp;
|
||||||
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
|
pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK);
|
|
||||||
|
|
||||||
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
@ -1865,7 +1883,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
||||||
taosMemoryFree(pThrd->loop);
|
taosMemoryFree(pThrd->loop);
|
||||||
taosHashCleanup(pThrd->fqdn2ipCache);
|
taosHashCleanup(pThrd->fqdn2ipCache);
|
||||||
taosHashCleanup(pThrd->failFastCache);
|
taosHashCleanup(pThrd->failFastCache);
|
||||||
taosHashCleanup(pThrd->connLimitCache);
|
|
||||||
|
|
||||||
void** pIter = taosHashIterate(pThrd->batchCache, NULL);
|
void** pIter = taosHashIterate(pThrd->batchCache, NULL);
|
||||||
while (pIter != NULL) {
|
while (pIter != NULL) {
|
||||||
|
@ -1884,6 +1901,23 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
||||||
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
|
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pThrd->batchCache);
|
taosHashCleanup(pThrd->batchCache);
|
||||||
|
|
||||||
|
pIter = taosHashIterate(pThrd->connLimitCache, NULL);
|
||||||
|
while (pIter != NULL) {
|
||||||
|
SMsgList* list = (SMsgList*)(*pIter);
|
||||||
|
while (!QUEUE_IS_EMPTY(&list->msgQ)) {
|
||||||
|
queue* h = QUEUE_HEAD(&list->msgQ);
|
||||||
|
QUEUE_REMOVE(h);
|
||||||
|
|
||||||
|
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
}
|
||||||
|
taosMemoryFree(list);
|
||||||
|
|
||||||
|
pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter);
|
||||||
|
}
|
||||||
|
taosHashCleanup(pThrd->connLimitCache);
|
||||||
|
|
||||||
taosMemoryFree(pThrd);
|
taosMemoryFree(pThrd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue