Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
ac282bdb35
|
@ -892,7 +892,7 @@ SDataType createDataType(uint8_t type) {
|
|||
}
|
||||
|
||||
SDataType createVarLenDataType(uint8_t type, const SToken* pLen) {
|
||||
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = tDataTypes[type].bytes };
|
||||
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = strtol(pLen->z, NULL, 10) };
|
||||
return dt;
|
||||
}
|
||||
|
||||
|
|
|
@ -31,10 +31,10 @@ extern "C" {
|
|||
#define TIMER_MAX_MS 0x7FFFFFFF
|
||||
#define ENV_TICK_TIMER_MS 1000
|
||||
#define PING_TIMER_MS 1000
|
||||
#define ELECT_TIMER_MS_MIN 1500
|
||||
#define ELECT_TIMER_MS_MAX 3000
|
||||
#define ELECT_TIMER_MS_MIN 150
|
||||
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
|
||||
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
|
||||
#define HEARTBEAT_TIMER_MS 300
|
||||
#define HEARTBEAT_TIMER_MS 30
|
||||
|
||||
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
|
||||
|
||||
|
|
|
@ -160,6 +160,11 @@ typedef struct SSyncNode {
|
|||
SSyncLogStore* pLogStore;
|
||||
SyncIndex commitIndex;
|
||||
|
||||
// timer ms init
|
||||
int32_t pingBaseLine;
|
||||
int32_t electBaseLine;
|
||||
int32_t hbBaseLine;
|
||||
|
||||
// ping timer
|
||||
tmr_h pPingTimer;
|
||||
int32_t pingTimerMS;
|
||||
|
|
|
@ -44,7 +44,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
|
|||
|
||||
// ---- misc ----
|
||||
int32_t syncUtilRand(int32_t max);
|
||||
int32_t syncUtilElectRandomMS();
|
||||
int32_t syncUtilElectRandomMS(int32_t min, int32_t max);
|
||||
int32_t syncUtilQuorum(int32_t replicaNum);
|
||||
cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
|
||||
cJSON* syncUtilRaftId2Json(const SRaftId* p);
|
||||
|
|
|
@ -242,9 +242,14 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
|||
assert(pSyncNode->pLogStore != NULL);
|
||||
pSyncNode->commitIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// timer ms init
|
||||
pSyncNode->pingBaseLine = PING_TIMER_MS;
|
||||
pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
|
||||
pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;
|
||||
|
||||
// init ping timer
|
||||
pSyncNode->pPingTimer = NULL;
|
||||
pSyncNode->pingTimerMS = PING_TIMER_MS;
|
||||
pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
|
||||
atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
|
||||
atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
|
||||
pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
|
||||
|
@ -252,7 +257,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
|||
|
||||
// init elect timer
|
||||
pSyncNode->pElectTimer = NULL;
|
||||
pSyncNode->electTimerMS = syncUtilElectRandomMS();
|
||||
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||
atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
|
||||
atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
|
||||
pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
|
||||
|
@ -260,7 +265,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
|||
|
||||
// init heartbeat timer
|
||||
pSyncNode->pHeartbeatTimer = NULL;
|
||||
pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS;
|
||||
pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
|
||||
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
|
||||
atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
|
||||
pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
|
||||
|
@ -394,7 +399,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
|||
|
||||
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
int32_t electMS = syncUtilElectRandomMS();
|
||||
int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||
ret = syncNodeRestartElectTimer(pSyncNode, electMS);
|
||||
return ret;
|
||||
}
|
||||
|
@ -763,7 +768,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
|||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
// reset timer ms
|
||||
pSyncNode->electTimerMS = syncUtilElectRandomMS();
|
||||
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pPingTimer);
|
||||
} else {
|
||||
|
|
|
@ -96,7 +96,10 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
|
|||
|
||||
int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
|
||||
|
||||
int32_t syncUtilElectRandomMS() { return ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); }
|
||||
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
|
||||
assert(min > 0 && max > 0 && max >= min);
|
||||
return min + syncUtilRand(max - min);
|
||||
}
|
||||
|
||||
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ void logTest() {
|
|||
|
||||
void electRandomMSTest() {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
int32_t ms = syncUtilElectRandomMS();
|
||||
int32_t ms = syncUtilElectRandomMS(150, 300);
|
||||
printf("syncUtilElectRandomMS: %d \n", ms);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue