From 45a012bdf5b7e7ac3bf6c97a3b73dd7d0da387aa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 25 Feb 2023 13:18:53 +0800 Subject: [PATCH] fix: limit session num --- source/libs/transport/src/transCli.c | 296 ++++++++++++++------------- 1 file changed, 150 insertions(+), 146 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 76c4ff46f3..9f8637aa05 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -14,9 +14,16 @@ #include "transComm.h" +typedef struct { + int32_t numOfConn; + queue msgQ; +} SMsgList; + typedef struct SConnList { - queue conns; - int32_t size; + queue conns; + int32_t size; + SMsgList* list; + void* pThrd; } SConnList; typedef struct { @@ -76,10 +83,6 @@ typedef struct SCliConn { } SCliConn; -typedef struct { - int32_t numOfConn; - queue msgQ; -} SMsgList; typedef struct SCliMsg { STransConnCtx* ctx; STransMsg msg; @@ -115,7 +118,6 @@ typedef struct SCliThrd { SCvtAddr cvtAddr; SHashObj* failFastCache; - SHashObj* connLimitCache; SHashObj* batchCache; SCliMsg* stopMsg; @@ -141,7 +143,7 @@ typedef struct { // add expire timeout and capacity limit static void* createConnPool(int size); static void* destroyConnPool(void* pool); -static SCliConn* getConnFromPool(void* pool, char* addr); +static SCliConn* getConnFromPool(SCliThrd* thread, char* key); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); @@ -181,8 +183,8 @@ static void cliSend(SCliConn* pConn); static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr); -static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg); +static void doFreeTimeoutMsg(void* param); +static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg); // cli util func static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); @@ -200,6 +202,7 @@ static void cliHandleExcept(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); @@ -546,20 +549,38 @@ void* createConnPool(int size) { } void* destroyConnPool(void* pool) { SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); + SCliThrd* pThrd = connList->pThrd; while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conns)) { queue* h = QUEUE_HEAD(&connList->conns); SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } + + SMsgList* msglist = connList->list; + while (!QUEUE_IS_EMPTY(&msglist->msgQ)) { + queue* h = QUEUE_HEAD(&msglist->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); + pMsg->ctx->task = NULL; + + doNotifyApp(pMsg, pThrd); + } + taosMemoryFree(msglist); + connList = taosHashIterate((SHashObj*)pool, connList); } taosHashCleanup(pool); return NULL; } -static SCliConn* getConnFromPool(void* pool, char* key) { +static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) { + void* pool = pThrd->pool; SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + STrans* pTranInst = pThrd->pTransInst; if (plist == NULL) { SConnList list = {0}; taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); @@ -568,7 +589,76 @@ static SCliConn* getConnFromPool(void* pool, char* key) { QUEUE_INIT(&plist->conns); } + SMsgList* msglist = plist->list; + if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) { + return NULL; + } + + plist->size -= 1; + queue* h = QUEUE_HEAD(&plist->conns); + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); + conn->status = ConnNormal; + QUEUE_REMOVE(&conn->q); + QUEUE_INIT(&conn->q); + + if (conn->task != NULL) { + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + conn->task = NULL; + } + return conn; +} + +static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { + void* pool = pThrd->pool; + STrans* pTransInst = pThrd->pTransInst; + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + if (plist == NULL) { + SConnList list = {0}; + + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + nList->numOfConn++; + QUEUE_INIT(&nList->msgQ); + list.list = nList; + + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + + plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + QUEUE_INIT(&plist->conns); + } + + SMsgList* list = plist->list; + // no avaliable conn in pool if (QUEUE_IS_EMPTY(&plist->conns)) { + if ((list)->numOfConn >= pTransInst->connLimitNum) { + STraceId* trace = &(*pMsg)->msg.info.traceId; + + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = *pMsg; + arg->param2 = pThrd; + (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + + tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); + + QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); + *pMsg = NULL; + } else { + // send msg in delay queue + if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = *pMsg; + arg->param2 = pThrd; + (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + + QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); + queue* h = QUEUE_HEAD(&(list)->msgQ); + QUEUE_REMOVE(h); + SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q); + + *pMsg = ans; + transDQCancel(pThrd->waitConnQueue, ans->ctx->task); + } + list->numOfConn++; + } return NULL; } @@ -604,29 +694,30 @@ static void addConnToPool(void* pool, SCliConn* conn) { cliDestroyConnMsgs(conn, false); - conn->status = ConnInPool; - - SMsgList** msglist = taosHashGet(thrd->connLimitCache, conn->ip, strlen(conn->ip)); - if (msglist != NULL && *msglist != NULL) { - if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) { - queue* h = QUEUE_HEAD(&(*msglist)->msgQ); - QUEUE_REMOVE(h); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - conn->status = ConnNormal; - transDQCancel(thrd->waitConnQueue, pMsg->ctx->task); - transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); - transQueuePush(&conn->cliMsgs, pMsg); - cliSend(conn); - return; - } - } - if (conn->list == NULL) { - tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip)); - } else { - tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); } + + SConnList* pList = conn->list; + SMsgList* msgList = pList->list; + if (!QUEUE_IS_EMPTY(&msgList->msgQ)) { + queue* h = QUEUE_HEAD(&(msgList)->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + transDQCancel(thrd->waitConnQueue, pMsg->ctx->task); + pMsg->ctx->task = NULL; + + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); + transQueuePush(&conn->cliMsgs, pMsg); + + conn->status = ConnNormal; + cliSend(conn); + return; + } + + conn->status = ConnInPool; QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; @@ -752,8 +843,19 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { static void cliDestroyConn(SCliConn* conn, bool clear) { SCliThrd* pThrd = conn->hostThrd; tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); + QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); + + if (conn->list != NULL) { + SConnList* connList = conn->list; + connList->list->numOfConn--; + } else { + SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip)); + connList->list->numOfConn--; + } + conn->list = NULL; + transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; @@ -788,10 +890,6 @@ static void cliDestroy(uv_handle_t* handle) { conn->timer->data = NULL; conn->timer = NULL; } - SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); - if (list != NULL && *list != NULL) { - (*list)->numOfConn--; - } atomic_sub_fetch_32(&pThrd->connCount, 1); @@ -1027,9 +1125,9 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); - SCliConn* conn = getConnFromPool(pThrd->pool, key); + SCliConn* conn = getConnFromPool(pThrd, key); - if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, key)) { + if (conn == NULL) { tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, pBatch->batchSize); cliDestroyBatch(pBatch); @@ -1085,16 +1183,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { cliHandleFastFail(conn, -1); return; } - - SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); - if (list == NULL || *list == NULL) { - SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); - nList->numOfConn++; - QUEUE_INIT(&nList->msgQ); - taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); - } else { - (*list)->numOfConn++; - } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); return; } @@ -1246,20 +1334,6 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { pThrd->stopMsg = pMsg; return; } - void** pIter = taosHashIterate(pThrd->connLimitCache, NULL); - while (pIter != NULL) { - SMsgList* list = (SMsgList*)(*pIter); - while (!QUEUE_IS_EMPTY(&list->msgQ)) { - queue* h = QUEUE_HEAD(&list->msgQ); - QUEUE_REMOVE(h); - - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); - - doNotifyApp(pMsg, pThrd); - } - pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter); - } pThrd->stopMsg = NULL; pThrd->quit = true; tDebug("cli work thread %p start to quit", pThrd); @@ -1298,11 +1372,11 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); } -SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { - STransConnCtx* pCtx = pMsg->ctx; +SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { + STransConnCtx* pCtx = (*pMsg)->ctx; SCliConn* conn = NULL; - int64_t refId = (int64_t)(pMsg->msg.info.handle); + int64_t refId = (int64_t)((*pMsg)->msg.info.handle); if (refId != 0) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { @@ -1312,7 +1386,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { } else { conn = exh->handle; if (conn == NULL) { - conn = getConnFromPool(pThrd->pool, addr); + conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) specifyConnRef(conn, true, refId); } transReleaseExHandle(transGetRefMgt(), refId); @@ -1320,7 +1394,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { return conn; }; - conn = getConnFromPool(pThrd->pool, addr); + conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { @@ -1378,19 +1452,6 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { return; } -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr) { - STrans* pTransInst = pThrd->pTransInst; - - SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr)); - if (list == NULL || *list == NULL) { - return 0; - } - - if ((*list)->numOfConn >= pTransInst->connLimitNum) { - return -1; - } - return 0; -} static void doFreeTimeoutMsg(void* param) { STaskArg* arg = param; SCliMsg* pMsg = arg->param1; @@ -1403,48 +1464,24 @@ static void doFreeTimeoutMsg(void* param) { doNotifyApp(pMsg, pThrd); taosMemoryFree(arg); } -static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) { - STrans* pTransInst = pThrd->pTransInst; - - SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr)); - if (list == NULL || *list == NULL) { - return 0; - } - - if ((*list)->numOfConn >= pTransInst->connLimitNum) { - STraceId* trace = &pMsg->msg.info.traceId; - - STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); - - arg->param1 = pMsg; - arg->param2 = pThrd; - - pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); - tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); - QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); - return -1; - } - return 0; -} void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { - STrans* pTransInst = pThrd->pTransInst; - STransConnCtx* pCtx = pMsg->ctx; - STraceId* trace = &pMsg->msg.info.traceId; + STrans* pTransInst = pThrd->pTransInst; + STraceId* trace = &pMsg->msg.info.traceId; - cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); - if (!EPSET_IS_VALID(&pCtx->epSet)) { + cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); destroyCmsg(pMsg); return; } - char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); char addr[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); bool ignore = false; - SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore, addr); + SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr); if (ignore == true) { // persist conn already release by server STransMsg resp; @@ -1455,12 +1492,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); return; } - if (conn == NULL && cliPreCheckSessionLimitForMsg(pThrd, addr, pMsg) != 0) { + if (conn == NULL && pMsg == NULL) { return; } if (conn != NULL) { - transCtxMerge(&conn->ctx, &pCtx->appCtx); + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); cliSend(conn); } else { @@ -1469,7 +1506,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { int64_t refId = (int64_t)pMsg->msg.info.handle; if (refId != 0) specifyConnRef(conn, true, refId); - transCtxMerge(&conn->ctx, &pCtx->appCtx); + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); conn->ip = strdup(addr); @@ -1521,17 +1558,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliHandleFastFail(conn, ret); return; } - - SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); - if (list == NULL || *list == NULL) { - SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); - nList->numOfConn++; - QUEUE_INIT(&nList->msgQ); - taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); - } else { - (*list)->numOfConn++; - } - uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } tGTrace("%s conn %p ready", pTransInst->label, conn); @@ -1914,7 +1940,6 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -1964,27 +1989,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); - - pIter = taosHashIterate(pThrd->connLimitCache, NULL); - while (pIter != NULL) { - SMsgList* list = (SMsgList*)(*pIter); - while (!QUEUE_IS_EMPTY(&list->msgQ)) { - queue* h = QUEUE_HEAD(&list->msgQ); - QUEUE_REMOVE(h); - - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - - if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - pThrd->destroyAhandleFp(pMsg->ctx->ahandle); - } - destroyCmsg(pMsg); - } - taosMemoryFree(list); - - pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter); - } - taosHashCleanup(pThrd->connLimitCache); - taosMemoryFree(pThrd); }