diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 681d1beb79..2076906f70 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -135,6 +135,9 @@ extern int32_t tsTtlPushInterval; extern int32_t tsGrantHBInterval; extern int32_t tsUptimeInterval; +extern int32_t tsRpcRetryLimit; +extern int32_t tsRpcRetryInterval; + //#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 8a0cccc71f..8cc37910fd 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -76,12 +76,14 @@ typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { char localFqdn[TSDB_FQDN_LEN]; - uint16_t localPort; // local port - char *label; // for debug purpose - int32_t numOfThreads; // number of threads to handle connections - int32_t sessions; // number of sessions allowed - int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS - int32_t idleTime; // milliseconds, 0 means idle timer is disabled + uint16_t localPort; // local port + char *label; // for debug purpose + int32_t numOfThreads; // number of threads to handle connections + int32_t sessions; // number of sessions allowed + int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS + int32_t idleTime; // milliseconds, 0 means idle timer is disabled + int32_t retryLimit; // retry limit + int32_t retryInterval; // retry interval ms int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index bb93e4d934..3f4e1bb513 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -146,6 +146,8 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.compressSize = tsCompressMsgSize; rpcInit.dfp = destroyAhandle; + rpcInit.retryLimit = tsRpcRetryLimit; + rpcInit.retryInterval = tsRpcRetryInterval; void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 88d7823593..c3140371c4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1971,6 +1971,8 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.compressSize = tsCompressMsgSize; rpcInit.user = "_dnd"; + rpcInit.retryLimit = tsRpcRetryLimit; + rpcInit.retryInterval = tsRpcRetryInterval; clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1be77077b6..27dcbd5be3 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -163,10 +163,12 @@ int32_t tsMqRebalanceInterval = 2; int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; -int32_t tsUptimeInterval = 300; // seconds +int32_t tsUptimeInterval = 300; // seconds char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits char tsUdfdLdLibPath[512] = ""; +int32_t tsRpcRetryLimit = 100; +int32_t tsRpcRetryInterval = 15; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); @@ -297,6 +299,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; + if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); @@ -422,6 +426,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; + + if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1; + GRANT_CFG_ADD; return 0; } @@ -634,6 +642,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32; tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval; tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; + + tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32; + tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32; return 0; } @@ -708,6 +719,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } + + tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32; + tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32; GRANT_CFG_GET; return 0; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 1e5f3139aa..5546d762f4 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -258,6 +258,8 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.parent = pDnode; rpcInit.rfp = rpcRfp; rpcInit.compressSize = tsCompressMsgSize; + rpcInit.retryLimit = tsRpcRetryLimit; + rpcInit.retryInterval = tsRpcRetryInterval; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 2354f0f959..5e76e6bd83 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -94,11 +94,11 @@ typedef void* queue[2]; /* Return the structure holding the given element. */ #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) -#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) -#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) -#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) -#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 +//#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit +//#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) +#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) +#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 #define TRANS_MAGIC_NUM 0x5f375a86 diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 3b7182f983..c8a56081cc 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -47,8 +47,10 @@ typedef struct { char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID - int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size - int8_t encryption; // encrypt or not + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size + int8_t encryption; // encrypt or not + int32_t retryLimit; // retry limit + int32_t retryInterval; // retry interval ms void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index feb55985de..94bc128de9 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -48,6 +48,8 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->compressSize = pInit->compressSize; pRpc->encryption = pInit->encryption; + pRpc->retryLimit = pInit->retryLimit; + pRpc->retryInterval = pInit->retryInterval; // register callback handle pRpc->cfp = pInit->cfp; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 457d3e5cb1..fd42c14101 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1288,6 +1288,7 @@ static void doCloseIdleConn(void* param) { } static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { + STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; STraceId* trace = &pMsg->msg.info.traceId; @@ -1299,7 +1300,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg; arg->param2 = pThrd; - transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); + transDQSched(pThrd->delayQueue, doDelayTask, arg, pTransInst->retryInterval); } FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { @@ -1351,7 +1352,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pMsg->sent = 0; pCtx->retryCnt += 1; if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) { - cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3); + cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, EPSET_GET_SIZE(&pCtx->epSet) * 3); if (pCtx->retryCnt < pCtx->retryLimit) { transUnrefCliHandle(pConn); EPSET_FORWARD_INUSE(&pCtx->epSet); @@ -1360,7 +1361,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { return -1; } } else { - cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT); + cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, pTransInst->retryLimit); if (pCtx->retryCnt < pCtx->retryLimit) { if (pResp->contLen == 0) { EPSET_FORWARD_INUSE(&pCtx->epSet);