diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index d96a55c74c..35e79cf45d 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -26,7 +26,10 @@ extern "C" { extern bool gRaftDetailLog; -#define SYNC_RESP_TTL_MS 10000000 +#define SYNC_RESP_TTL_MS 10000000 +#define SYNC_SPEED_UP_HB_TIMER 400 +#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) +#define SYNC_SLOW_DOWN_RANGE 100 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 @@ -205,6 +208,7 @@ int32_t syncSetStandby(int64_t rid); ESyncState syncGetMyRole(int64_t rid); bool syncIsReady(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); +bool syncRestoreFinish(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/include/util/tdef.h b/include/util/tdef.h index 6d893765fc..f4e3385651 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -359,11 +359,11 @@ typedef enum ELogicConditionType { #define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF #define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1 -#define TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD 0 +#define TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD (24 * 60 * 60 * 2) #define TSDB_DB_MIN_WAL_RETENTION_SIZE -1 -#define TSDB_DEFAULT_DB_WAL_RETENTION_SIZE 0 +#define TSDB_DEFAULT_DB_WAL_RETENTION_SIZE -1 #define TSDB_DB_MIN_WAL_ROLL_PERIOD 0 -#define TSDB_DEFAULT_DB_WAL_ROLL_PERIOD 0 +#define TSDB_DEFAULT_DB_WAL_ROLL_PERIOD (24 * 60 * 60 * 1) #define TSDB_DB_MIN_WAL_SEGMENT_SIZE 0 #define TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE 0 diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 6284d7751d..c525d09057 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -730,7 +730,8 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } bool vnodeIsLeader(SVnode *pVnode) { if (!syncIsReady(pVnode->sync)) { - vDebug("vgId:%d, vnode not ready", pVnode->config.vgId); + vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync), + syncRestoreFinish(pVnode->sync)); return false; } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 250b294c19..856870caba 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -162,6 +162,9 @@ typedef struct SSyncNode { // is config changing bool changing; + int64_t startTime; + int64_t lastReplicateTime; + } SSyncNode; // open/close -------------- @@ -186,16 +189,18 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode); // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode); + int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeStartHeartbeatTimerNow(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeRestartNowHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeRestartHeartbeatTimerNow(SSyncNode* pSyncNode); int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms); // utils -------------- diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index dad7d9b5ec..21821be6c7 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -55,7 +55,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode); -int32_t syncNodeReplicate(SSyncNode* pSyncNode); +int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 50bc767556..20b4b23bf7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -495,6 +495,18 @@ const char* syncGetMyRoleStr(int64_t rid) { return s; } +bool syncRestoreFinish(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return false; + } + ASSERT(rid == pSyncNode->rid); + bool restoreFinish = pSyncNode->restoreFinish; + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return restoreFinish; +} + SyncTerm syncGetMyTerm(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -1086,6 +1098,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // start raft // syncNodeBecomeFollower(pSyncNode); + int64_t timeNow = taosGetTimestampMs(); + pSyncNode->startTime = timeNow; + pSyncNode->lastReplicateTime = timeNow; + syncNodeEventLog(pSyncNode, "sync open"); return pSyncNode; @@ -1303,7 +1319,7 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { return ret; } -int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { +static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; if (syncEnvIsStart()) { taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, @@ -1322,26 +1338,21 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { return ret; } -int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) { - int32_t ret = 0; - if (syncEnvIsStart()) { - taosTmrReset(pSyncNode->FpHeartbeatTimerCB, ms, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); - atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); - } else { - sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); - } - - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", ms); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); - +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { + pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; + int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode); return ret; } -int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) { - int32_t ret = syncNodeStartHeartbeatTimerMS(pSyncNode, 1); +int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) { + pSyncNode->heartbeatTimerMS = ms; + int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode); + return ret; +} + +int32_t syncNodeStartHeartbeatTimerNow(SSyncNode* pSyncNode) { + pSyncNode->heartbeatTimerMS = 1; + int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode); return ret; } @@ -1362,9 +1373,9 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeRestartNowHeartbeatTimer(SSyncNode* pSyncNode) { +int32_t syncNodeRestartHeartbeatTimerNow(SSyncNode* pSyncNode) { syncNodeStopHeartbeatTimer(pSyncNode); - syncNodeStartNowHeartbeatTimer(pSyncNode); + syncNodeStartHeartbeatTimerNow(pSyncNode); return 0; } @@ -1942,9 +1953,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); -#if 0 // simon - syncNodeReplicate(pSyncNode); -#endif syncMaybeAdvanceCommitIndex(pSyncNode); } else { @@ -2126,9 +2134,6 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); -#if 0 // simon - syncNodeReplicate(pSyncNode); -#endif syncMaybeAdvanceCommitIndex(pSyncNode); } @@ -2499,7 +2504,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); ASSERT(code == 0); - syncNodeReplicate(ths); + syncNodeReplicate(ths, false); } syncEntryDestory(pEntry); @@ -2572,7 +2577,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI // if mulit replica, start replicate right now if (ths->replicaNum > 1) { - syncNodeReplicate(ths); + syncNodeReplicate(ths, false); // pre commit syncNodePreCommit(ths, pEntry, 0); @@ -2641,7 +2646,7 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p if (ths->replicaNum > 1) { // if multi replica, start replicate right now - syncNodeReplicate(ths); + syncNodeReplicate(ths, false); } else if (ths->replicaNum == 1) { // one replica diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 1a147e391d..29370fd4a5 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -202,17 +202,19 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { syncAppendEntriesBatchDestroy(pMsg); // speed up - if (pMsg->dataCount > 0 && pMsg->prevLogIndex < pSyncNode->commitIndex) { + if (pMsg->dataCount > 0 && pSyncNode->commitIndex - pMsg->prevLogIndex > SYNC_SLOW_DOWN_RANGE) { ret = 1; +#if 0 do { char logBuf[128]; char host[64]; uint16_t port; syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); - snprintf(logBuf, sizeof(logBuf), "speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex); + snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex); syncNodeEventLog(pSyncNode, logBuf); } while (0); +#endif } } @@ -301,7 +303,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { return ret; } -int32_t syncNodeReplicate(SSyncNode* pSyncNode) { +int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer) { // start replicate int32_t ret = 0; @@ -323,13 +325,38 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { break; } - if (ret > 0) { + // start delay + int64_t timeNow = taosGetTimestampMs(); + int64_t startDelay = timeNow - pSyncNode->startTime; + + // replicate delay + int64_t replicateDelay = timeNow - pSyncNode->lastReplicateTime; + pSyncNode->lastReplicateTime = timeNow; + + if (ret > 0 && isTimer && startDelay > SYNC_SPEED_UP_AFTER_MS) { // speed up replicate - int32_t ms = pSyncNode->heartbeatTimerMS < 50 ? pSyncNode->heartbeatTimerMS : 50; + int32_t ms = + pSyncNode->heartbeatTimerMS < SYNC_SPEED_UP_HB_TIMER ? pSyncNode->heartbeatTimerMS : SYNC_SPEED_UP_HB_TIMER; syncNodeRestartNowHeartbeatTimerMS(pSyncNode, ms); +#if 0 + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "replicate speed up"); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); +#endif + } else { syncNodeRestartHeartbeatTimer(pSyncNode); + +#if 0 + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "replicate slow down"); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); +#endif } return ret; diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 15b1054e63..65314b812a 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -58,7 +58,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ++(ths->heartbeatTimerCounter); sInfo("vgId:%d, sync timeout, type:replicate count:%d, heartbeatTimerLogicClockUser:%ld", ths->vgId, ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser); - syncNodeReplicate(ths); + syncNodeReplicate(ths, true); } } else { sError("vgId:%d, unknown timeout-type:%d", ths->vgId, pMsg->timeoutType);