From 8937192799d45f5ea1d3cb98a85c43a0f96750d7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 16 Jun 2023 18:04:57 +0800 Subject: [PATCH] refactor checkpoint --- source/dnode/mnode/impl/src/mndStream.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 2b0bc98a7d..07463009bf 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -946,9 +946,11 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in } static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, SHashObj *vgIds, int64_t checkpointId) { - if (checkpointId == pStream->checkpointId) { + int64_t timestampMs = taosGetTimestampMs(); + if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) { return -1; } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); if (pTrans == NULL) return -1; mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); @@ -1008,6 +1010,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre // 2. reset tick pStream->checkpointFreq = checkpointId; pStream->checkpointId = checkpointId; + pStream->checkpointFreq = taosGetTimestampMs(); atomic_store_64(&pStream->currentTick, 0); // 3. commit log: stream checkpoint info pStream->version = pStream->version + 1;