diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d9712dde4a..5d5dc00ad8 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -131,6 +131,7 @@ typedef struct SRpcInit { int32_t batchSize; int8_t shareConn; // 0: no share, 1. share int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait + int8_t startReadTimer; void *parent; } SRpcInit; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 1861144bf0..060fce5808 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -109,7 +109,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t svrVer = 0; (void)taosVersionStrToInt(version, &svrVer); if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) { - dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer, pRpc->info.conn.clientIp); + dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer, + pRpc->info.conn.clientIp); goto _OVER; } @@ -391,7 +392,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.notWaitAvaliableConn = 0; (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); - + rpcInit.startReadTimer = 1; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { dError("failed to init dnode rpc client since:%s", tstrerror(terrno)); @@ -434,11 +435,12 @@ int32_t dmInitStatusClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + rpcInit.startReadTimer = 1; (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->statusRpc = rpcOpen(&rpcInit); if (pTrans->statusRpc == NULL) { - dError("failed to init dnode rpc status client since %s", tstrerror(terrno)); + dError("failed to init dnode rpc status client since %s", tstrerror(terrno)); return terrno; } @@ -480,7 +482,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) { rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); - + rpcInit.startReadTimer = 1; pTrans->syncRpc = rpcOpen(&rpcInit); if (pTrans->syncRpc == NULL) { dError("failed to init dnode rpc sync client since %s", tstrerror(terrno)); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 8199ee21c9..8e10357f07 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -78,6 +78,7 @@ typedef struct { void* tcphandle; // returned handle from TCP initialization int64_t refId; int8_t shareConn; + int8_t startReadTimer; TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index d7ec89f871..bebbcb68d6 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -42,6 +42,8 @@ void* rpcOpen(const SRpcInit* pInit) { if (pRpc == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } + + pRpc->startReadTimer = pInit->startReadTimer; if (pInit->label) { int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label); memcpy(pRpc->label, pInit->label, len); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c0141dfa3a..3e47908a86 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -365,7 +365,12 @@ void cliResetConnTimer(SCliConn* conn) { } } -void cliConnMaySetReadTimeout(SCliConn* conn, int timeout) { +void cliConnMayUpdateTimer(SCliConn* conn, int timeout) { + SCliThrd* pThrd = conn->hostThrd; + STrans* pInst = pThrd->pInst; + if (pInst->startReadTimer == 0) { + return; + } if (conn->timer != NULL) { // reset previous timer cliResetConnTimer(conn); @@ -374,9 +379,6 @@ void cliConnMaySetReadTimeout(SCliConn* conn, int timeout) { if (reqsSentNum == 0) { return; } - - cliConnCheckTimoutMsg(conn); - if (conn->timer == NULL) { if (cliGetConnTimer(conn->hostThrd, conn) != 0) { return; @@ -610,7 +612,7 @@ void cliHandleResp(SCliConn* conn) { return; } - cliConnMaySetReadTimeout(conn, READ_TIMEOUT); + cliConnMayUpdateTimer(conn, READ_TIMEOUT); (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } @@ -656,7 +658,7 @@ void cliConnCheckTimoutMsg(SCliConn* conn) { QUEUE_REMOVE(el); SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); STraceId* trace = &pReq->msg.info.traceId; - tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, pReq->msg.msgType); + tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pReq->msg.msgType)); SReqCtx* pCtx = pReq ? pReq->ctx : NULL; STransMsg resp = {0}; @@ -1151,7 +1153,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) { return; } - cliConnMaySetReadTimeout(conn, READ_TIMEOUT); + cliConnMayUpdateTimer(conn, READ_TIMEOUT); (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);