fix: limit session num

This commit is contained in:
yihaoDeng 2023-02-25 14:21:15 +08:00
parent d7dc176ebb
commit be17aa822b
1 changed files with 11 additions and 10 deletions

View File

@ -143,7 +143,7 @@ typedef struct {
// add expire timeout and capacity limit // add expire timeout and capacity limit
static void* createConnPool(int size); static void* createConnPool(int size);
static void* destroyConnPool(void* pool); static void* destroyConnPool(void* pool);
static SCliConn* getConnFromPool(SCliThrd* thread, char* key); static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
static void addConnToPool(void* pool, SCliConn* conn); static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
@ -577,7 +577,7 @@ void* destroyConnPool(void* pool) {
return NULL; return NULL;
} }
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) { static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
void* pool = pThrd->pool; void* pool = pThrd->pool;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
STrans* pTranInst = pThrd->pTransInst; STrans* pTranInst = pThrd->pTransInst;
@ -591,11 +591,12 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) {
list.list = nList; list.list = nList;
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet((SHashObj*)pool, key, strlen(key)); return NULL;
} }
SMsgList* msglist = plist->list; SMsgList* msglist = plist->list;
if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) { if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) {
*exceed = true;
return NULL; return NULL;
} }
@ -1131,11 +1132,12 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
char key[TSDB_FQDN_LEN + 64] = {0}; char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);
SCliConn* conn = getConnFromPool(pThrd, key); bool exceed = false;
SCliConn* conn = getConnFromPool(pThrd, key, &exceed);
if (conn == NULL) { if (conn == NULL && exceed) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen,
pBatch->batchSize); pBatch->batchSize, pTransInst->connLimitNum);
cliDestroyBatch(pBatch); cliDestroyBatch(pBatch);
return; return;
} }
@ -1471,12 +1473,10 @@ static void doFreeTimeoutMsg(void* param) {
taosMemoryFree(arg); taosMemoryFree(arg);
} }
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STraceId* trace = &pMsg->msg.info.traceId;
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
destroyCmsg(pMsg); destroyCmsg(pMsg);
return; return;
} }
@ -1501,6 +1501,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
if (conn == NULL && pMsg == NULL) { if (conn == NULL && pMsg == NULL) {
return; return;
} }
STraceId* trace = &pMsg->msg.info.traceId;
if (conn != NULL) { if (conn != NULL) {
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);