refactor checkpoint

This commit is contained in:
yihaoDeng 2023-06-16 18:04:57 +08:00
parent 5cd35cc18c
commit 8937192799
1 changed files with 4 additions and 1 deletions

View File

@ -946,9 +946,11 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
} }
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, SHashObj *vgIds, static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, SHashObj *vgIds,
int64_t checkpointId) { int64_t checkpointId) {
if (checkpointId == pStream->checkpointId) { int64_t timestampMs = taosGetTimestampMs();
if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) {
return -1; return -1;
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
@ -1008,6 +1010,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
// 2. reset tick // 2. reset tick
pStream->checkpointFreq = checkpointId; pStream->checkpointFreq = checkpointId;
pStream->checkpointId = checkpointId; pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs();
atomic_store_64(&pStream->currentTick, 0); atomic_store_64(&pStream->currentTick, 0);
// 3. commit log: stream checkpoint info // 3. commit log: stream checkpoint info
pStream->version = pStream->version + 1; pStream->version = pStream->version + 1;