reset timeout

This commit is contained in:
yihaoDeng 2024-10-14 19:19:01 +08:00
parent cc033c9260
commit 9a480e0597
1 changed files with 34 additions and 31 deletions

View File

@ -310,6 +310,7 @@ static FORCE_INLINE bool filterToDebug_timeoutMsg(void* key, void* arg);
static FORCE_INLINE bool filterToRmTimoutReq(void* key, void* arg);
static FORCE_INLINE bool filterTimeoutReq(void* key, void* arg);
static int8_t cliConnRemoveTimeoutMsg(SCliConn* pConn);
typedef struct {
void* p;
HeapNode node;
@ -682,7 +683,7 @@ void cliHandleResp(SCliConn* conn) {
}
cliConnCheckTimoutMsg(conn);
cliConnMayUpdateTimer(conn, pInst->readTimeout);
cliConnMayUpdateTimer(conn, pInst->readTimeout * 1000);
}
void cliConnTimeout(uv_timer_t* handle) {
@ -742,39 +743,40 @@ void cliConnCheckTimoutMsg(SCliConn* conn) {
if (transQueueSize(&conn->reqsSentOut) == 0) {
return;
}
code = cliConnRemoveTimeoutMsg(conn);
// QUEUE_INIT(&set);
// SListFilterArg arg = {.id = 0, .pInst = pInst};
// transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, &arg, &set, -1);
QUEUE_INIT(&set);
SListFilterArg arg = {.id = 0, .pInst = pInst};
transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, &arg, &set, -1);
// while (!QUEUE_IS_EMPTY(&set)) {
// queue* el = QUEUE_HEAD(&set);
// QUEUE_REMOVE(el);
// SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
// STraceId* trace = &pReq->msg.info.traceId;
// tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn,
// TMSG_INFO(pReq->msg.msgType));
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
QUEUE_REMOVE(el);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
STraceId* trace = &pReq->msg.info.traceId;
tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pReq->msg.msgType));
// SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
// STransMsg resp = {0};
// resp.code = TSDB_CODE_RPC_TIMEOUT;
// resp.msgType = pReq ? pReq->msg.msgType + 1 : 0;
// resp.info.cliVer = pInst->compatibilityVer;
// resp.info.ahandle = pCtx ? pCtx->ahandle : 0;
// resp.info.handle = pReq->msg.info.handle;
// if (pReq) {
// resp.info.traceId = pReq->msg.info.traceId;
// }
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STransMsg resp = {0};
resp.code = TSDB_CODE_RPC_TIMEOUT;
resp.msgType = pReq ? pReq->msg.msgType + 1 : 0;
resp.info.cliVer = pInst->compatibilityVer;
resp.info.ahandle = pCtx ? pCtx->ahandle : 0;
resp.info.handle = pReq->msg.info.handle;
if (pReq) {
resp.info.traceId = pReq->msg.info.traceId;
}
pReq->seq = 0;
code = cliNotifyCb(conn, pReq, &resp);
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
continue;
} else {
// already notify user
destroyReq(pReq);
// destroyReqWrapper(pReq, pThrd);
}
}
// pReq->seq = 0;
// code = cliNotifyCb(conn, pReq, &resp);
// if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
// continue;
// } else {
// // already notify user
// destroyReq(pReq);
// // destroyReqWrapper(pReq, pThrd);
// }
// }
return;
}
@ -3711,6 +3713,7 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) {
}
if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) {
tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn);
if (cliConnRemoveTimeoutMsg(pConn)) {
tWarn("%s conn %p succ to remove timeout msg", transLabel(pInst), pConn);
}