diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 16f0de2b00..59ccaf54f6 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -72,6 +72,7 @@ extern int32_t tsTagFilterResCacheSize; extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; extern int32_t tsShareConnLimit; +extern int32_t tsReadTimeout; extern int32_t tsTimeToGetAvailableConn; extern int32_t tsKeepAliveIdle; extern int32_t tsNumOfCommitThreads; diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 2f6b23a594..cfa3f44f7f 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -132,6 +132,7 @@ typedef struct SRpcInit { int8_t shareConn; // 0: no share, 1. share int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait int8_t startReadTimer; + int64_t readTimeout; // s void *parent; } SRpcInit; @@ -151,6 +152,7 @@ typedef struct { SHashObj *args; SRpcBrokenlinkVal brokenVal; void (*freeFunc)(const void *arg); + int64_t st; } SRpcCtx; int32_t rpcInit(); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 1f5f5499a8..51027ca7d3 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -373,6 +373,7 @@ int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, rpcInit.shareConnLimit = tsShareConnLimit; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.startReadTimer = 1; + rpcInit.readTimeout = tsReadTimeout; int32_t code = taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 2564d2d489..341a462a84 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2570,6 +2570,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + rpcInit.readTimeout = tsReadTimeout; if (TSDB_CODE_SUCCESS != taosVersionStrToInt(version, &(rpcInit.compatibilityVer))) { tscError("faild to convert taos version from str to int, errcode:%s", terrstr()); goto _OVER; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index db9228477d..6087af8f65 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -57,6 +57,7 @@ int32_t tsShellActivityTimer = 3; // second int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 30000; int32_t tsShareConnLimit = 8; +int32_t tsReadTimeout = 128; int32_t tsTimeToGetAvailableConn = 500000; int32_t tsKeepAliveIdle = 60; @@ -616,6 +617,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { tsShareConnLimit = TRANGE(tsShareConnLimit, 1, 256); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "shareConnLimit", tsShareConnLimit, 1, 256, CFG_SCOPE_BOTH, CFG_DYN_NONE)); + tsReadTimeout = TRANGE(tsReadTimeout, 0, 24 * 3600); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "readTimeout", tsShareConnLimit, 0, 24 * 3600, CFG_SCOPE_BOTH, CFG_DYN_NONE)); + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000); TAOS_CHECK_RETURN( cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); @@ -895,6 +899,13 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(pCfg, "readTimeout"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsShareConnLimit = TRANGE(tsShareConnLimit, 64, 24 * 3600); + pItem->i32 = tsShareConnLimit; + pItem->stype = stype; + } + pItem = cfgGetItem(pCfg, "timeToGetAvailableConn"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000); @@ -1272,6 +1283,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "shareConnLimit"); tsShareConnLimit = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "readTimeout"); + tsReadTimeout = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "timeToGetAvailableConn"); tsTimeToGetAvailableConn = pItem->i32; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c01e016e0c..9bf3afff76 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -409,6 +409,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.notWaitAvaliableConn = 0; rpcInit.startReadTimer = 1; + rpcInit.readTimeout = tsReadTimeout; if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) { dError("failed to convert version string:%s to int", version); @@ -457,6 +458,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) { rpcInit.shareConnLimit = tsShareConnLimit * 2; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.startReadTimer = 1; + rpcInit.readTimeout = 0; if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) { dError("failed to convert version string:%s to int", version); @@ -506,6 +508,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) { rpcInit.shareConnLimit = tsShareConnLimit * 8; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.startReadTimer = 1; + rpcInit.readTimeout = tsReadTimeout; if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) { dError("failed to convert version string:%s to int", version); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index cf0ccd9fb2..39afb29342 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -79,6 +79,7 @@ typedef struct { int64_t refId; int8_t shareConn; int8_t startReadTimer; + int64_t readTimeout; TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 862f3d0adb..a479920360 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -84,6 +84,11 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->shareConnLimit = BUFFER_LIMIT; } + pRpc->readTimeout = pInit->readTimeout; + if (pRpc->readTimeout <= 0) { + pRpc->readTimeout = INT64_MAX; + } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { pRpc->numOfThreads = 1; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c7554f02d2..27cbc313bf 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,6 +27,7 @@ typedef struct { typedef struct SConnList { queue conns; int32_t size; + int32_t totaSize; } SConnList; typedef struct { @@ -65,7 +66,6 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue reqsToSend; STransQueue reqsSentOut; - SHashObj* pQueryTable; queue q; SConnList* list; @@ -162,6 +162,8 @@ typedef struct SCliThrd { int32_t (*notifyExceptCb)(void* arg, SCliReq* pReq, STransMsg* pResp); SHashObj* pIdConnTable; // + + SArray* pQIdBuf; // tmp buf to avoid alloc buf; } SCliThrd; typedef struct SCliObj { @@ -267,6 +269,7 @@ static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq); static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead); static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp); static int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq); +static int32_t cliHandleState_mayUpdateStateTime(SCliConn* pConn, SCliReq* pReq); int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); @@ -285,11 +288,17 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); static FORCE_INLINE int32_t destroyAllReqs(SCliConn* SCliConn); +typedef struct SListFilterArg { + int64_t id; + STrans* pInst; +} SListFilterArg; + static FORCE_INLINE bool filterAllReq(void* key, void* arg); static FORCE_INLINE bool filerBySeq(void* key, void* arg); static FORCE_INLINE bool filterByQid(void* key, void* arg); static FORCE_INLINE bool filterToDebug_timeoutMsg(void* key, void* arg); static FORCE_INLINE bool filterToRmTimoutReq(void* key, void* arg); +static FORCE_INLINE bool filterTimeoutReq(void* key, void* arg); typedef struct { void* p; @@ -378,7 +387,7 @@ void cliResetConnTimer(SCliConn* conn) { } } -void cliConnMayUpdateTimer(SCliConn* conn, int timeout) { +void cliConnMayUpdateTimer(SCliConn* conn, int64_t timeout) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; if (pInst->startReadTimer == 0) { @@ -550,6 +559,7 @@ int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, ST } STransCtx* pCtx = taosHashGet(conn->pQTable, &qId, sizeof(qId)); + pCtx->st = taosGetTimestampUs(); if (pCtx == 0) { return TSDB_CODE_RPC_NO_STATE; } @@ -624,6 +634,12 @@ void cliHandleResp(SCliConn* conn) { } return; } + } else { + code = cliHandleState_mayUpdateStateTime(conn, pReq); + if (code != 0) { + tDebug("%s conn %p failed to update state time qid:" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId, + tstrerror(code)); + } } code = cliBuildRespFromCont(pReq, &resp, pHead); @@ -642,7 +658,7 @@ void cliHandleResp(SCliConn* conn) { } cliConnCheckTimoutMsg(conn); - cliConnMayUpdateTimer(conn, READ_TIMEOUT); + cliConnMayUpdateTimer(conn, pInst->readTimeout); } void cliConnTimeout(uv_timer_t* handle) { @@ -659,10 +675,11 @@ void cliConnTimeout(uv_timer_t* handle) { } bool filterToRmTimoutReq(void* key, void* arg) { - SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); + SListFilterArg* filterArg = arg; + SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) { - int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000); - if (elapse > READ_TIMEOUT) { + int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000000); + if (filterArg && (elapse > filterArg->pInst->readTimeout)) { return false; } else { return false; @@ -672,10 +689,11 @@ bool filterToRmTimoutReq(void* key, void* arg) { } bool filterToDebug_timeoutMsg(void* key, void* arg) { - SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); + SListFilterArg* filterArg = arg; + SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) { - int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000); - if (elapse > READ_TIMEOUT) { + int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000000); + if (filterArg && elapse > filterArg->pInst->readTimeout) { tWarn("req %s timeout, elapse:%" PRId64 "ms", TMSG_INFO(pReq->msg.msgType), elapse); return false; } @@ -702,7 +720,8 @@ void cliConnCheckTimoutMsg(SCliConn* conn) { } QUEUE_INIT(&set); - transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, NULL, &set, -1); + SListFilterArg arg = {.id = 0, .pInst = pInst}; + transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, &arg, &set, -1); while (!QUEUE_IS_EMPTY(&set)) { queue* el = QUEUE_HEAD(&set); @@ -804,6 +823,7 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p if (plist->size >= pInst->connLimitNum) { return TSDB_CODE_RPC_MAX_SESSIONS; } + plist->totaSize += 1; return TSDB_CODE_RPC_NETWORK_BUSY; } @@ -1086,6 +1106,11 @@ static void cliDestroy(uv_handle_t* handle) { if (code != 0) { tDebug("%s conn %p failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); } + + if (conn->list) { + conn->list->totaSize -= 1; + conn->list = NULL; + } taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); taosMemoryFree(conn->ipStr); @@ -1107,6 +1132,56 @@ static void cliDestroy(uv_handle_t* handle) { static FORCE_INLINE bool filterAllReq(void* e, void* arg) { return 1; } +static void notifyAndDestroyReq(SCliConn* pConn, SCliReq* pReq, int32_t code) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + + SReqCtx* pCtx = pReq ? pReq->ctx : NULL; + STransMsg resp = {0}; + resp.code = (pConn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL); + if (code != 0) { + resp.code = code; + } + + resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; + resp.info.cliVer = pInst->compatibilityVer; + resp.info.ahandle = pCtx ? pCtx->ahandle : 0; + resp.info.handle = pReq->msg.info.handle; + if (pReq) { + resp.info.traceId = pReq->msg.info.traceId; + } + + STraceId* trace = &resp.info.traceId; + tDebug("%s conn %p notify user and destroy msg %s since %s", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pReq->msg.msgType), tstrerror(resp.code)); + + // handle noresp and inter manage msg + if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) { + tDebug("%s conn %p destroy msg directly since %s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msg.msgType), + tstrerror(resp.code)); + destroyReq(pReq); + return; + } + + pReq->seq = 0; + code = cliNotifyCb(pConn, pReq, &resp); + if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { + return; + } else { + // already notify user + destroyReq(pReq); + } +} + +static FORCE_INLINE void destroyReqInQueue(SCliConn* conn, queue* set) { + while (!QUEUE_IS_EMPTY(&set)) { + queue* el = QUEUE_HEAD(&set); + QUEUE_REMOVE(el); + + SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + notifyAndDestroyReq(conn, pReq, 0); + } +} static FORCE_INLINE int32_t destroyAllReqs(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; @@ -1119,38 +1194,7 @@ static FORCE_INLINE int32_t destroyAllReqs(SCliConn* conn) { transQueueRemoveByFilter(&conn->reqsSentOut, filterAllReq, NULL, &set, -1); transQueueRemoveByFilter(&conn->reqsToSend, filterAllReq, NULL, &set, -1); - while (!QUEUE_IS_EMPTY(&set)) { - queue* el = QUEUE_HEAD(&set); - QUEUE_REMOVE(el); - - SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); - - SReqCtx* pCtx = pReq ? pReq->ctx : NULL; - STransMsg resp = {0}; - resp.code = (conn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL); - resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; - resp.info.cliVer = pInst->compatibilityVer; - resp.info.ahandle = pCtx ? pCtx->ahandle : 0; - resp.info.handle = pReq->msg.info.handle; - if (pReq) { - resp.info.traceId = pReq->msg.info.traceId; - } - - // handle noresp and inter manage msg - if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) { - destroyReq(pReq); - continue; - } - - pReq->seq = 0; - code = cliNotifyCb(conn, pReq, &resp); - if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { - continue; - } else { - // already notify user - destroyReq(pReq); - } - } + destroyReqInQueue(conn, &set); return 0; } static void cliHandleException(SCliConn* conn) { @@ -1166,6 +1210,10 @@ static void cliHandleException(SCliConn* conn) { cliDestroyAllQidFromThrd(conn); QUEUE_REMOVE(&conn->q); + if (conn->list) { + conn->list->totaSize -= 1; + conn->list = NULL; + } if (conn->registered) { int8_t ref = transGetRefCount(conn); @@ -1203,6 +1251,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) { SCliConn* conn = wrapper->arg; SCliThrd* pThrd = conn->hostThrd; + STrans* pInst = pThrd->pInst; freeWReqToWQ(&conn->wq, wrapper); @@ -1217,7 +1266,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) { return; } - cliConnMayUpdateTimer(conn, READ_TIMEOUT); + cliConnMayUpdateTimer(conn, pInst->readTimeout); if (conn->readerStart == 0) { code = uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); if (code != 0) { @@ -1709,6 +1758,16 @@ static void doFreeTimeoutMsg(void* param) { taosMemoryFree(arg); } +int32_t cliHandleState_mayUpdateStateTime(SCliConn* pConn, SCliReq* pReq) { + int64_t qid = pReq->msg.info.qId; + if (qid > 0) { + STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); + if (pUserCtx != NULL) { + pUserCtx->st = taosGetTimestampUs(); + } + } + return 0; +} int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) { int32_t code = 0; int64_t qid = pReq->msg.info.qId; @@ -1721,14 +1780,17 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) { STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); if (pUserCtx == NULL) { + pCtx->userCtx.st = taosGetTimestampUs(); code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx)); tDebug("%s conn %p succ to add statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } else { transCtxMerge(pUserCtx, &pCtx->userCtx); + pUserCtx->st = taosGetTimestampUs(); tDebug("%s conn %p succ to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } return 0; } + int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { int32_t code = 0; int64_t qid = pReq->msg.info.qId; @@ -2247,6 +2309,11 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } + pThrd->pQIdBuf = taosArrayInit(8, sizeof(int64_t)); + if (pThrd->pQIdBuf == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + pThrd->initCb = initCb; pThrd->notifyCb = notfiyCb; pThrd->notifyExceptCb = notifyExceptCb; @@ -2281,6 +2348,7 @@ _end: taosHashCleanup(pThrd->failFastCache); taosHashCleanup(pThrd->batchCache); taosHashCleanup(pThrd->pIdConnTable); + taosArrayDestroy(pThrd->pQIdBuf); taosMemoryFree(pThrd); } @@ -2343,6 +2411,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosHashCleanup(pThrd->pIdConnTable); taosMemoryFree(pThrd->pCvtAddr); + taosArrayDestroy(pThrd->pQIdBuf); taosMemoryFree(pThrd); } @@ -3442,12 +3511,119 @@ static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHea return code; } -static FORCE_INLINE int8_t shouldSWitchToOtherConn(STrans* pInst, int32_t reqNum, int32_t sentNum, int32_t stateNum) { - int32_t total = reqNum + sentNum; - if (stateNum >= pInst->shareConnLimit) { - return 1; +bool filterTimeoutReq(void* key, void* arg) { + SListFilterArg* listArg = arg; + if (listArg == NULL) { + return false; } - if (total >= pInst->shareConnLimit) { + + int64_t st = listArg->id; + SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); + if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) { + int64_t elapse = ((st - pReq->st) / 1000000); + if (listArg && elapse > listArg->pInst->readTimeout) { + return true; + } else { + return false; + } + } + return false; +} + +static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set) { + int32_t code = 0; + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + + SArray* pQIdBuf = pThrd->pQIdBuf; + void* pIter = taosHashIterate(pConn->pQTable, NULL); + while (pIter) { + STransCtx* pCtx = (STransCtx*)pIter; + int64_t* qid = taosHashGetKey(pIter, NULL); + + if (((*st - pCtx->st) / 1000000) > pInst->readTimeout) { + code = taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid)); + if (code != 0) { + tError("%s conn %p failed to remove state qid:" PRId64 " since %s", tstrerror(code)); + } + + transReleaseExHandle(transGetRefMgt(), *qid); + transRemoveExHandle(transGetRefMgt(), *qid); + + if (taosArrayPush(pQIdBuf, qid) == NULL) { + code = terrno; + tError("%s conn %p failed to add qid:" PRId64 " since %s", tstrerror(code)); + break; + } + } + taosHashIterate(pConn->pQTable, pIter); + } + + for (int32_t i = 0; i < taosArrayGetSize(pQIdBuf); i++) { + int64_t* qid = taosArrayGet(pQIdBuf, i); + transQueueRemoveByFilter(&pConn->reqsSentOut, filterByQid, qid, &set, -1); + transQueueRemoveByFilter(&pConn->reqsToSend, filterByQid, qid, &set, -1); + + STransCtx* p = taosHashGet(pConn->pQTable, qid, sizeof(*qid)); + transCtxCleanup(p); + code = taosHashRemove(pConn->pQTable, qid, sizeof(*qid)); + if (code != 0) { + tError("%s conn %p failed to drop ctx of qid:" PRId64 " since %s", tstrerror(code)); + } + } + + taosArrayClear(pQIdBuf); +} + +static void cliConnRemoveTimeoutNoQidMsg(SCliConn* pConn, int64_t* st, queue* set) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + SListFilterArg arg = {.id = *st, .pInst = pInst}; + transQueueRemoveByFilter(&pConn->reqsToSend, filterTimeoutReq, st, &set, -1); + return; +} + +static int8_t cliConnRemoveTimeoutMsg(SCliConn* pConn) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + + queue set; + QUEUE_INIT(&set); + + int64_t now = taosGetTimestampUs(); + + cliConnRemoveTimoutQidMsg(pConn, &now, &set); + cliConnRemoveTimeoutNoQidMsg(pConn, &now, &set); + + if (QUEUE_IS_EMPTY(&set)) { + return 0; + } + tDebug("%s conn %p do remove timeout msg", pInst->label, pConn); + destroyReqInQueue(pConn, &set); + return 1; +} +static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + + tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt); + int32_t reqsNum = transQueueSize(&pConn->reqsToSend); + int32_t reqsSentOut = transQueueSize(&pConn->reqsSentOut); + int32_t stateNum = taosHashGetSize(pConn->pQTable); + int32_t totalReqs = reqsNum + reqsSentOut; + + if (stateNum >= pInst->shareConnLimit || totalReqs >= pInst->shareConnLimit) { + if (pConn->list == NULL && pConn->dstAddr != NULL) { + pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr)); + } + if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) { + tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn); + if (cliConnRemoveTimeoutMsg(pConn)) { + tDebug("%s conn %p succ to remove timeout msg", transLabel(pInst), pConn); + } + return 1; + } + // check req timeout or not return 1; } @@ -3487,13 +3663,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { tDebug("failed to get conn from heap cache for key:%s", key); return NULL; } else { - tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt); - int32_t reqsNum = transQueueSize(&pConn->reqsToSend); - int32_t reqsSentOut = transQueueSize(&pConn->reqsSentOut); - int32_t stateNum = taosHashGetSize(pConn->pQTable); - SCliThrd* pThrd = pConn->hostThrd; - STrans* pInst = pThrd->pInst; - if (shouldSWitchToOtherConn(pInst, reqsNum, reqsSentOut, stateNum)) { + if (shouldSWitchToOtherConn(pConn, key)) { logConnMissHit(pConn); return NULL; }