diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index eed6391d5c..1e51288e02 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -892,7 +892,7 @@ SDataType createDataType(uint8_t type) { } SDataType createVarLenDataType(uint8_t type, const SToken* pLen) { - SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = tDataTypes[type].bytes }; + SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = strtol(pLen->z, NULL, 10) }; return dt; } diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 66c7c8620d..7940d7f436 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -31,10 +31,10 @@ extern "C" { #define TIMER_MAX_MS 0x7FFFFFFF #define ENV_TICK_TIMER_MS 1000 #define PING_TIMER_MS 1000 -#define ELECT_TIMER_MS_MIN 1500 -#define ELECT_TIMER_MS_MAX 3000 +#define ELECT_TIMER_MS_MIN 150 +#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) -#define HEARTBEAT_TIMER_MS 300 +#define HEARTBEAT_TIMER_MS 30 #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index eaf4d56f67..cfa87048a7 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -160,6 +160,11 @@ typedef struct SSyncNode { SSyncLogStore* pLogStore; SyncIndex commitIndex; + // timer ms init + int32_t pingBaseLine; + int32_t electBaseLine; + int32_t hbBaseLine; + // ping timer tmr_h pPingTimer; int32_t pingTimerMS; diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 1b702c2528..a4bb11afc6 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -44,7 +44,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); // ---- misc ---- int32_t syncUtilRand(int32_t max); -int32_t syncUtilElectRandomMS(); +int32_t syncUtilElectRandomMS(int32_t min, int32_t max); int32_t syncUtilQuorum(int32_t replicaNum); cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); cJSON* syncUtilRaftId2Json(const SRaftId* p); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index facb0180f9..b92317ef3f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -242,9 +242,14 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { assert(pSyncNode->pLogStore != NULL); pSyncNode->commitIndex = SYNC_INDEX_INVALID; + // timer ms init + pSyncNode->pingBaseLine = PING_TIMER_MS; + pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN; + pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS; + // init ping timer pSyncNode->pPingTimer = NULL; - pSyncNode->pingTimerMS = PING_TIMER_MS; + pSyncNode->pingTimerMS = pSyncNode->pingBaseLine; atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0); pSyncNode->FpPingTimerCB = syncNodeEqPingTimer; @@ -252,7 +257,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init elect timer pSyncNode->pElectTimer = NULL; - pSyncNode->electTimerMS = syncUtilElectRandomMS(); + pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); atomic_store_64(&pSyncNode->electTimerLogicClock, 0); atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0); pSyncNode->FpElectTimerCB = syncNodeEqElectTimer; @@ -260,7 +265,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init heartbeat timer pSyncNode->pHeartbeatTimer = NULL; - pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS; + pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0); atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0); pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer; @@ -394,7 +399,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - int32_t electMS = syncUtilElectRandomMS(); + int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); ret = syncNodeRestartElectTimer(pSyncNode, electMS); return ret; } @@ -763,7 +768,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { syncTimeoutDestroy(pSyncMsg); // reset timer ms - pSyncNode->electTimerMS = syncUtilElectRandomMS(); + pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 9486405331..8fc17ccb51 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -96,7 +96,10 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { int32_t syncUtilRand(int32_t max) { return taosRand() % max; } -int32_t syncUtilElectRandomMS() { return ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); } +int32_t syncUtilElectRandomMS(int32_t min, int32_t max) { + assert(min > 0 && max > 0 && max >= min); + return min + syncUtilRand(max - min); +} int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; } diff --git a/source/libs/sync/test/syncUtilTest.cpp b/source/libs/sync/test/syncUtilTest.cpp index 663db3a7b3..ae2afd2f53 100644 --- a/source/libs/sync/test/syncUtilTest.cpp +++ b/source/libs/sync/test/syncUtilTest.cpp @@ -16,7 +16,7 @@ void logTest() { void electRandomMSTest() { for (int i = 0; i < 10; ++i) { - int32_t ms = syncUtilElectRandomMS(); + int32_t ms = syncUtilElectRandomMS(150, 300); printf("syncUtilElectRandomMS: %d \n", ms); } }