add time out

This commit is contained in:
yihaoDeng 2024-09-30 16:56:14 +08:00
parent adb8251bd6
commit 7e11fdeed6
9 changed files with 253 additions and 55 deletions

View File

@ -72,6 +72,7 @@ extern int32_t tsTagFilterResCacheSize;
extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions;
extern int32_t tsShareConnLimit;
extern int32_t tsReadTimeout;
extern int32_t tsTimeToGetAvailableConn;
extern int32_t tsKeepAliveIdle;
extern int32_t tsNumOfCommitThreads;

View File

@ -132,6 +132,7 @@ typedef struct SRpcInit {
int8_t shareConn; // 0: no share, 1. share
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
int8_t startReadTimer;
int64_t readTimeout; // s
void *parent;
} SRpcInit;
@ -151,6 +152,7 @@ typedef struct {
SHashObj *args;
SRpcBrokenlinkVal brokenVal;
void (*freeFunc)(const void *arg);
int64_t st;
} SRpcCtx;
int32_t rpcInit();

View File

@ -373,6 +373,7 @@ int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread,
rpcInit.shareConnLimit = tsShareConnLimit;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.startReadTimer = 1;
rpcInit.readTimeout = tsReadTimeout;
int32_t code = taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
if (TSDB_CODE_SUCCESS != code) {

View File

@ -2570,6 +2570,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.readTimeout = tsReadTimeout;
if (TSDB_CODE_SUCCESS != taosVersionStrToInt(version, &(rpcInit.compatibilityVer))) {
tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
goto _OVER;

View File

@ -57,6 +57,7 @@ int32_t tsShellActivityTimer = 3; // second
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 30000;
int32_t tsShareConnLimit = 8;
int32_t tsReadTimeout = 128;
int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsKeepAliveIdle = 60;
@ -616,6 +617,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
tsShareConnLimit = TRANGE(tsShareConnLimit, 1, 256);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "shareConnLimit", tsShareConnLimit, 1, 256, CFG_SCOPE_BOTH, CFG_DYN_NONE));
tsReadTimeout = TRANGE(tsReadTimeout, 0, 24 * 3600);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "readTimeout", tsShareConnLimit, 0, 24 * 3600, CFG_SCOPE_BOTH, CFG_DYN_NONE));
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000);
TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
@ -895,6 +899,13 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
pItem = cfgGetItem(pCfg, "readTimeout");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsShareConnLimit = TRANGE(tsShareConnLimit, 64, 24 * 3600);
pItem->i32 = tsShareConnLimit;
pItem->stype = stype;
}
pItem = cfgGetItem(pCfg, "timeToGetAvailableConn");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
@ -1272,6 +1283,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "shareConnLimit");
tsShareConnLimit = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "readTimeout");
tsReadTimeout = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "timeToGetAvailableConn");
tsTimeToGetAvailableConn = pItem->i32;

View File

@ -409,6 +409,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.notWaitAvaliableConn = 0;
rpcInit.startReadTimer = 1;
rpcInit.readTimeout = tsReadTimeout;
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);
@ -457,6 +458,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
rpcInit.shareConnLimit = tsShareConnLimit * 2;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.startReadTimer = 1;
rpcInit.readTimeout = 0;
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);
@ -506,6 +508,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) {
rpcInit.shareConnLimit = tsShareConnLimit * 8;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.startReadTimer = 1;
rpcInit.readTimeout = tsReadTimeout;
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);

View File

@ -79,6 +79,7 @@ typedef struct {
int64_t refId;
int8_t shareConn;
int8_t startReadTimer;
int64_t readTimeout;
TdThreadMutex mutex;
} SRpcInfo;

View File

@ -84,6 +84,11 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->shareConnLimit = BUFFER_LIMIT;
}
pRpc->readTimeout = pInit->readTimeout;
if (pRpc->readTimeout <= 0) {
pRpc->readTimeout = INT64_MAX;
}
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
if (pRpc->numOfThreads <= 0) {
pRpc->numOfThreads = 1;

View File

@ -27,6 +27,7 @@ typedef struct {
typedef struct SConnList {
queue conns;
int32_t size;
int32_t totaSize;
} SConnList;
typedef struct {
@ -65,7 +66,6 @@ typedef struct SCliConn {
SConnBuffer readBuf;
STransQueue reqsToSend;
STransQueue reqsSentOut;
SHashObj* pQueryTable;
queue q;
SConnList* list;
@ -162,6 +162,8 @@ typedef struct SCliThrd {
int32_t (*notifyExceptCb)(void* arg, SCliReq* pReq, STransMsg* pResp);
SHashObj* pIdConnTable; // <qid, conn>
SArray* pQIdBuf; // tmp buf to avoid alloc buf;
} SCliThrd;
typedef struct SCliObj {
@ -267,6 +269,7 @@ static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq);
static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead);
static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp);
static int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq);
static int32_t cliHandleState_mayUpdateStateTime(SCliConn* pConn, SCliReq* pReq);
int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
@ -285,11 +288,17 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
static FORCE_INLINE int32_t destroyAllReqs(SCliConn* SCliConn);
typedef struct SListFilterArg {
int64_t id;
STrans* pInst;
} SListFilterArg;
static FORCE_INLINE bool filterAllReq(void* key, void* arg);
static FORCE_INLINE bool filerBySeq(void* key, void* arg);
static FORCE_INLINE bool filterByQid(void* key, void* arg);
static FORCE_INLINE bool filterToDebug_timeoutMsg(void* key, void* arg);
static FORCE_INLINE bool filterToRmTimoutReq(void* key, void* arg);
static FORCE_INLINE bool filterTimeoutReq(void* key, void* arg);
typedef struct {
void* p;
@ -378,7 +387,7 @@ void cliResetConnTimer(SCliConn* conn) {
}
}
void cliConnMayUpdateTimer(SCliConn* conn, int timeout) {
void cliConnMayUpdateTimer(SCliConn* conn, int64_t timeout) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
if (pInst->startReadTimer == 0) {
@ -550,6 +559,7 @@ int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, ST
}
STransCtx* pCtx = taosHashGet(conn->pQTable, &qId, sizeof(qId));
pCtx->st = taosGetTimestampUs();
if (pCtx == 0) {
return TSDB_CODE_RPC_NO_STATE;
}
@ -624,6 +634,12 @@ void cliHandleResp(SCliConn* conn) {
}
return;
}
} else {
code = cliHandleState_mayUpdateStateTime(conn, pReq);
if (code != 0) {
tDebug("%s conn %p failed to update state time qid:" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tstrerror(code));
}
}
code = cliBuildRespFromCont(pReq, &resp, pHead);
@ -642,7 +658,7 @@ void cliHandleResp(SCliConn* conn) {
}
cliConnCheckTimoutMsg(conn);
cliConnMayUpdateTimer(conn, READ_TIMEOUT);
cliConnMayUpdateTimer(conn, pInst->readTimeout);
}
void cliConnTimeout(uv_timer_t* handle) {
@ -659,10 +675,11 @@ void cliConnTimeout(uv_timer_t* handle) {
}
bool filterToRmTimoutReq(void* key, void* arg) {
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
SListFilterArg* filterArg = arg;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) {
int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000);
if (elapse > READ_TIMEOUT) {
int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000000);
if (filterArg && (elapse > filterArg->pInst->readTimeout)) {
return false;
} else {
return false;
@ -672,10 +689,11 @@ bool filterToRmTimoutReq(void* key, void* arg) {
}
bool filterToDebug_timeoutMsg(void* key, void* arg) {
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
SListFilterArg* filterArg = arg;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) {
int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000);
if (elapse > READ_TIMEOUT) {
int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000000);
if (filterArg && elapse > filterArg->pInst->readTimeout) {
tWarn("req %s timeout, elapse:%" PRId64 "ms", TMSG_INFO(pReq->msg.msgType), elapse);
return false;
}
@ -702,7 +720,8 @@ void cliConnCheckTimoutMsg(SCliConn* conn) {
}
QUEUE_INIT(&set);
transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, NULL, &set, -1);
SListFilterArg arg = {.id = 0, .pInst = pInst};
transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, &arg, &set, -1);
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
@ -804,6 +823,7 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p
if (plist->size >= pInst->connLimitNum) {
return TSDB_CODE_RPC_MAX_SESSIONS;
}
plist->totaSize += 1;
return TSDB_CODE_RPC_NETWORK_BUSY;
}
@ -1086,6 +1106,11 @@ static void cliDestroy(uv_handle_t* handle) {
if (code != 0) {
tDebug("%s conn %p failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
if (conn->list) {
conn->list->totaSize -= 1;
conn->list = NULL;
}
taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->stream);
taosMemoryFree(conn->ipStr);
@ -1107,6 +1132,56 @@ static void cliDestroy(uv_handle_t* handle) {
static FORCE_INLINE bool filterAllReq(void* e, void* arg) { return 1; }
static void notifyAndDestroyReq(SCliConn* pConn, SCliReq* pReq, int32_t code) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STransMsg resp = {0};
resp.code = (pConn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL);
if (code != 0) {
resp.code = code;
}
resp.msgType = pReq ? pReq->msg.msgType + 1 : 0;
resp.info.cliVer = pInst->compatibilityVer;
resp.info.ahandle = pCtx ? pCtx->ahandle : 0;
resp.info.handle = pReq->msg.info.handle;
if (pReq) {
resp.info.traceId = pReq->msg.info.traceId;
}
STraceId* trace = &resp.info.traceId;
tDebug("%s conn %p notify user and destroy msg %s since %s", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msg.msgType), tstrerror(resp.code));
// handle noresp and inter manage msg
if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) {
tDebug("%s conn %p destroy msg directly since %s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msg.msgType),
tstrerror(resp.code));
destroyReq(pReq);
return;
}
pReq->seq = 0;
code = cliNotifyCb(pConn, pReq, &resp);
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
return;
} else {
// already notify user
destroyReq(pReq);
}
}
static FORCE_INLINE void destroyReqInQueue(SCliConn* conn, queue* set) {
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
QUEUE_REMOVE(el);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
notifyAndDestroyReq(conn, pReq, 0);
}
}
static FORCE_INLINE int32_t destroyAllReqs(SCliConn* conn) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
@ -1119,38 +1194,7 @@ static FORCE_INLINE int32_t destroyAllReqs(SCliConn* conn) {
transQueueRemoveByFilter(&conn->reqsSentOut, filterAllReq, NULL, &set, -1);
transQueueRemoveByFilter(&conn->reqsToSend, filterAllReq, NULL, &set, -1);
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
QUEUE_REMOVE(el);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STransMsg resp = {0};
resp.code = (conn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL);
resp.msgType = pReq ? pReq->msg.msgType + 1 : 0;
resp.info.cliVer = pInst->compatibilityVer;
resp.info.ahandle = pCtx ? pCtx->ahandle : 0;
resp.info.handle = pReq->msg.info.handle;
if (pReq) {
resp.info.traceId = pReq->msg.info.traceId;
}
// handle noresp and inter manage msg
if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) {
destroyReq(pReq);
continue;
}
pReq->seq = 0;
code = cliNotifyCb(conn, pReq, &resp);
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
continue;
} else {
// already notify user
destroyReq(pReq);
}
}
destroyReqInQueue(conn, &set);
return 0;
}
static void cliHandleException(SCliConn* conn) {
@ -1166,6 +1210,10 @@ static void cliHandleException(SCliConn* conn) {
cliDestroyAllQidFromThrd(conn);
QUEUE_REMOVE(&conn->q);
if (conn->list) {
conn->list->totaSize -= 1;
conn->list = NULL;
}
if (conn->registered) {
int8_t ref = transGetRefCount(conn);
@ -1203,6 +1251,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
SCliConn* conn = wrapper->arg;
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
freeWReqToWQ(&conn->wq, wrapper);
@ -1217,7 +1266,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
return;
}
cliConnMayUpdateTimer(conn, READ_TIMEOUT);
cliConnMayUpdateTimer(conn, pInst->readTimeout);
if (conn->readerStart == 0) {
code = uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
if (code != 0) {
@ -1709,6 +1758,16 @@ static void doFreeTimeoutMsg(void* param) {
taosMemoryFree(arg);
}
int32_t cliHandleState_mayUpdateStateTime(SCliConn* pConn, SCliReq* pReq) {
int64_t qid = pReq->msg.info.qId;
if (qid > 0) {
STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (pUserCtx != NULL) {
pUserCtx->st = taosGetTimestampUs();
}
}
return 0;
}
int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) {
int32_t code = 0;
int64_t qid = pReq->msg.info.qId;
@ -1721,14 +1780,17 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) {
STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (pUserCtx == NULL) {
pCtx->userCtx.st = taosGetTimestampUs();
code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx));
tDebug("%s conn %p succ to add statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
} else {
transCtxMerge(pUserCtx, &pCtx->userCtx);
pUserCtx->st = taosGetTimestampUs();
tDebug("%s conn %p succ to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
}
return 0;
}
int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
int32_t code = 0;
int64_t qid = pReq->msg.info.qId;
@ -2247,6 +2309,11 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
pThrd->pQIdBuf = taosArrayInit(8, sizeof(int64_t));
if (pThrd->pQIdBuf == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
pThrd->initCb = initCb;
pThrd->notifyCb = notfiyCb;
pThrd->notifyExceptCb = notifyExceptCb;
@ -2281,6 +2348,7 @@ _end:
taosHashCleanup(pThrd->failFastCache);
taosHashCleanup(pThrd->batchCache);
taosHashCleanup(pThrd->pIdConnTable);
taosArrayDestroy(pThrd->pQIdBuf);
taosMemoryFree(pThrd);
}
@ -2343,6 +2411,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosHashCleanup(pThrd->pIdConnTable);
taosMemoryFree(pThrd->pCvtAddr);
taosArrayDestroy(pThrd->pQIdBuf);
taosMemoryFree(pThrd);
}
@ -3442,12 +3511,119 @@ static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHea
return code;
}
static FORCE_INLINE int8_t shouldSWitchToOtherConn(STrans* pInst, int32_t reqNum, int32_t sentNum, int32_t stateNum) {
int32_t total = reqNum + sentNum;
if (stateNum >= pInst->shareConnLimit) {
return 1;
bool filterTimeoutReq(void* key, void* arg) {
SListFilterArg* listArg = arg;
if (listArg == NULL) {
return false;
}
if (total >= pInst->shareConnLimit) {
int64_t st = listArg->id;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) {
int64_t elapse = ((st - pReq->st) / 1000000);
if (listArg && elapse > listArg->pInst->readTimeout) {
return true;
} else {
return false;
}
}
return false;
}
static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set) {
int32_t code = 0;
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
SArray* pQIdBuf = pThrd->pQIdBuf;
void* pIter = taosHashIterate(pConn->pQTable, NULL);
while (pIter) {
STransCtx* pCtx = (STransCtx*)pIter;
int64_t* qid = taosHashGetKey(pIter, NULL);
if (((*st - pCtx->st) / 1000000) > pInst->readTimeout) {
code = taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid));
if (code != 0) {
tError("%s conn %p failed to remove state qid:" PRId64 " since %s", tstrerror(code));
}
transReleaseExHandle(transGetRefMgt(), *qid);
transRemoveExHandle(transGetRefMgt(), *qid);
if (taosArrayPush(pQIdBuf, qid) == NULL) {
code = terrno;
tError("%s conn %p failed to add qid:" PRId64 " since %s", tstrerror(code));
break;
}
}
taosHashIterate(pConn->pQTable, pIter);
}
for (int32_t i = 0; i < taosArrayGetSize(pQIdBuf); i++) {
int64_t* qid = taosArrayGet(pQIdBuf, i);
transQueueRemoveByFilter(&pConn->reqsSentOut, filterByQid, qid, &set, -1);
transQueueRemoveByFilter(&pConn->reqsToSend, filterByQid, qid, &set, -1);
STransCtx* p = taosHashGet(pConn->pQTable, qid, sizeof(*qid));
transCtxCleanup(p);
code = taosHashRemove(pConn->pQTable, qid, sizeof(*qid));
if (code != 0) {
tError("%s conn %p failed to drop ctx of qid:" PRId64 " since %s", tstrerror(code));
}
}
taosArrayClear(pQIdBuf);
}
static void cliConnRemoveTimeoutNoQidMsg(SCliConn* pConn, int64_t* st, queue* set) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
SListFilterArg arg = {.id = *st, .pInst = pInst};
transQueueRemoveByFilter(&pConn->reqsToSend, filterTimeoutReq, st, &set, -1);
return;
}
static int8_t cliConnRemoveTimeoutMsg(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
queue set;
QUEUE_INIT(&set);
int64_t now = taosGetTimestampUs();
cliConnRemoveTimoutQidMsg(pConn, &now, &set);
cliConnRemoveTimeoutNoQidMsg(pConn, &now, &set);
if (QUEUE_IS_EMPTY(&set)) {
return 0;
}
tDebug("%s conn %p do remove timeout msg", pInst->label, pConn);
destroyReqInQueue(pConn, &set);
return 1;
}
static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt);
int32_t reqsNum = transQueueSize(&pConn->reqsToSend);
int32_t reqsSentOut = transQueueSize(&pConn->reqsSentOut);
int32_t stateNum = taosHashGetSize(pConn->pQTable);
int32_t totalReqs = reqsNum + reqsSentOut;
if (stateNum >= pInst->shareConnLimit || totalReqs >= pInst->shareConnLimit) {
if (pConn->list == NULL && pConn->dstAddr != NULL) {
pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr));
}
if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) {
tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn);
if (cliConnRemoveTimeoutMsg(pConn)) {
tDebug("%s conn %p succ to remove timeout msg", transLabel(pInst), pConn);
}
return 1;
}
// check req timeout or not
return 1;
}
@ -3487,13 +3663,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
tDebug("failed to get conn from heap cache for key:%s", key);
return NULL;
} else {
tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt);
int32_t reqsNum = transQueueSize(&pConn->reqsToSend);
int32_t reqsSentOut = transQueueSize(&pConn->reqsSentOut);
int32_t stateNum = taosHashGetSize(pConn->pQTable);
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
if (shouldSWitchToOtherConn(pInst, reqsNum, reqsSentOut, stateNum)) {
if (shouldSWitchToOtherConn(pConn, key)) {
logConnMissHit(pConn);
return NULL;
}