From 1c519bcae0068607c319de7c80b9bbaa525f2987 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Nov 2022 18:43:57 +0800 Subject: [PATCH] redefine timer --- source/libs/transport/inc/transComm.h | 2 +- source/libs/transport/src/transCli.c | 36 +++++++++++++++------------ 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 2354f0f959..9dcc174f9b 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -96,7 +96,7 @@ typedef void* queue[2]; #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit #define TRANS_RETRY_INTERVAL 15 // retry interval (ms) -#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) +#define TRANS_CONN_TIMEOUT 3000 // connect timeout (s) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_PACKET_LIMIT 1024 * 1024 * 512 diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b91875e84b..3af0747bcf 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -25,7 +25,9 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; queue wreqQueue; - uv_timer_t* timer; + + uv_timer_t* timer; // read timer, forbidden + uv_timer_t connTimer; void* hostThrd; @@ -102,6 +104,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); +// register conn timer +static void cliConnTimeout(uv_timer_t* handle); // register timer for read static void cliReadTimeoutCb(uv_timer_t* handle); // register timer in each thread to clear expire conn @@ -462,6 +466,12 @@ void cliHandleExcept(SCliConn* conn) { cliHandleExceptImpl(conn, -1); } +void cliConnTimeout(uv_timer_t* handle) { + SCliConn* conn = handle->data; + tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); + uv_timer_stop(handle); + cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT); +} void cliReadTimeoutCb(uv_timer_t* handle) { // set up timeout cb SCliConn* conn = handle->data; @@ -638,8 +648,10 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - conn->connReq.data = conn; + uv_timer_init(pThrd->loop, &conn->connTimer); + conn->connTimer.data = conn; + conn->connReq.data = conn; transReqQueueInit(&conn->wreqQueue); transQueueInit(&conn->cliMsgs, NULL); @@ -819,6 +831,8 @@ _RETURN: void cliConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; + uv_timer_stop(&pConn->connTimer); + if (status != 0) { tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); cliHandleExcept(pConn); @@ -997,31 +1011,21 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - int ret = transSetConnOption((uv_tcp_t*)conn->stream); - if (ret) { - tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret)); - } - int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT); - if (fd == -1) { - tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn); - cliHandleExcept(conn); - return; - } - uv_tcp_open((uv_tcp_t*)conn->stream, fd); - struct sockaddr_in addr; addr.sin_family = AF_INET; - addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); addr.sin_port = (uint16_t)htons((uint16_t)conn->port); tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); - ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + + int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, uv_err_name(ret)); + uv_timer_stop(&conn->connTimer); cliHandleExcept(conn); return; } + uv_timer_start(&conn->connTimer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s conn %p ready", pTransInst->label, conn);