From bbb3d4cc6188b532448aee9e785119e4f1ec1e74 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 16 Sep 2024 16:31:02 +0800 Subject: [PATCH] Merge branch '3.0' into enh/opt-transport --- source/libs/transport/src/transCli.c | 40 ++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 72b9889afa..dbdaa1a4aa 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -196,6 +196,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn); static void cliBatchSendCb(uv_write_t* req, int status); void cliBatchSendImpl(SCliConn* pConn); static int32_t cliBatchSend(SCliConn* conn); +void cliConnCheckTimoutMsg(SCliConn* conn); bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead); // register conn timer static void cliConnTimeout(uv_timer_t* handle); @@ -364,7 +365,23 @@ void cliResetConnTimer(SCliConn* conn) { } } -void cliConnSetReadTimeout(SCliConn* conn, int timeout) { +void cliConnMaySetReadTimeout(SCliConn* conn, int timeout) { + if (conn->timer != NULL) { + // reset previous timer + cliResetConnTimer(conn); + } + int32_t reqsSentNum = transQueueSize(&conn->reqsSentOut); + if (reqsSentNum == 0) { + return; + } + + cliConnCheckTimoutMsg(conn); + + if (conn->timer != NULL) { + // reset previous timer + cliResetConnTimer(conn); + } + if (conn->timer == NULL) { if (cliGetConnTimer(conn->hostThrd, conn) != 0) { return; @@ -529,8 +546,6 @@ void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - cliResetConnTimer(conn); - cliConnClearInitUserMsg(conn); SCliReq* pReq = NULL; @@ -599,7 +614,8 @@ void cliHandleResp(SCliConn* conn) { if (cliMayRecycleConn(conn)) { return; } - if (transQueueSize(&conn->reqsSentOut)) cliConnSetReadTimeout(conn, READ_TIMEOUT); + + cliConnMaySetReadTimeout(conn, READ_TIMEOUT); (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } @@ -627,12 +643,11 @@ bool filterToRmTimoutReq(void* key, void* arg) { } return false; } -void cliConnTimeout__checkReq(uv_timer_t* handle) { + +void cliConnCheckTimoutMsg(SCliConn* conn) { int32_t code = 0; queue set; QUEUE_INIT(&set); - - SCliConn* conn = handle->data; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; if (transQueueSize(&conn->reqsSentOut) == 0) { @@ -668,6 +683,12 @@ void cliConnTimeout__checkReq(uv_timer_t* handle) { destroyReqWrapper(pReq, pThrd); } } + + return; +} +void cliConnTimeout__checkReq(uv_timer_t* handle) { + SCliConn* conn = handle->data; + cliConnCheckTimoutMsg(conn); } void* createConnPool(int size) { @@ -931,7 +952,6 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int conn->ipStr = taosStrdup(ip); conn->port = port; - QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; conn->broken = false; @@ -1136,9 +1156,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) { return; } - cliResetConnTimer(conn); - - if (transQueueSize(&conn->reqsSentOut)) cliConnSetReadTimeout(conn, READ_TIMEOUT); + cliConnMaySetReadTimeout(conn, READ_TIMEOUT); (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);