From 9a480e0597cf9a869ff85e2a85dc6779b9b4fcbe Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 14 Oct 2024 19:19:01 +0800 Subject: [PATCH] reset timeout --- source/libs/transport/src/transCli.c | 65 +++++++++++++++------------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1074f050fa..6797716c7b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -310,6 +310,7 @@ static FORCE_INLINE bool filterToDebug_timeoutMsg(void* key, void* arg); static FORCE_INLINE bool filterToRmTimoutReq(void* key, void* arg); static FORCE_INLINE bool filterTimeoutReq(void* key, void* arg); +static int8_t cliConnRemoveTimeoutMsg(SCliConn* pConn); typedef struct { void* p; HeapNode node; @@ -682,7 +683,7 @@ void cliHandleResp(SCliConn* conn) { } cliConnCheckTimoutMsg(conn); - cliConnMayUpdateTimer(conn, pInst->readTimeout); + cliConnMayUpdateTimer(conn, pInst->readTimeout * 1000); } void cliConnTimeout(uv_timer_t* handle) { @@ -742,39 +743,40 @@ void cliConnCheckTimoutMsg(SCliConn* conn) { if (transQueueSize(&conn->reqsSentOut) == 0) { return; } + code = cliConnRemoveTimeoutMsg(conn); + // QUEUE_INIT(&set); + // SListFilterArg arg = {.id = 0, .pInst = pInst}; + // transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, &arg, &set, -1); - QUEUE_INIT(&set); - SListFilterArg arg = {.id = 0, .pInst = pInst}; - transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, &arg, &set, -1); + // while (!QUEUE_IS_EMPTY(&set)) { + // queue* el = QUEUE_HEAD(&set); + // QUEUE_REMOVE(el); + // SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + // STraceId* trace = &pReq->msg.info.traceId; + // tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, + // TMSG_INFO(pReq->msg.msgType)); - while (!QUEUE_IS_EMPTY(&set)) { - queue* el = QUEUE_HEAD(&set); - QUEUE_REMOVE(el); - SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); - STraceId* trace = &pReq->msg.info.traceId; - tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pReq->msg.msgType)); + // SReqCtx* pCtx = pReq ? pReq->ctx : NULL; + // STransMsg resp = {0}; + // resp.code = TSDB_CODE_RPC_TIMEOUT; + // resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; + // resp.info.cliVer = pInst->compatibilityVer; + // resp.info.ahandle = pCtx ? pCtx->ahandle : 0; + // resp.info.handle = pReq->msg.info.handle; + // if (pReq) { + // resp.info.traceId = pReq->msg.info.traceId; + // } - SReqCtx* pCtx = pReq ? pReq->ctx : NULL; - STransMsg resp = {0}; - resp.code = TSDB_CODE_RPC_TIMEOUT; - resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; - resp.info.cliVer = pInst->compatibilityVer; - resp.info.ahandle = pCtx ? pCtx->ahandle : 0; - resp.info.handle = pReq->msg.info.handle; - if (pReq) { - resp.info.traceId = pReq->msg.info.traceId; - } - - pReq->seq = 0; - code = cliNotifyCb(conn, pReq, &resp); - if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { - continue; - } else { - // already notify user - destroyReq(pReq); - // destroyReqWrapper(pReq, pThrd); - } - } + // pReq->seq = 0; + // code = cliNotifyCb(conn, pReq, &resp); + // if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { + // continue; + // } else { + // // already notify user + // destroyReq(pReq); + // // destroyReqWrapper(pReq, pThrd); + // } + // } return; } @@ -3711,6 +3713,7 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) { } if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) { tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn); + if (cliConnRemoveTimeoutMsg(pConn)) { tWarn("%s conn %p succ to remove timeout msg", transLabel(pInst), pConn); }