From b6fe5d917bd980daa009565a0e9f59fb5ae4c3ff Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 17 Jun 2023 14:05:49 +0800 Subject: [PATCH] refactor checkpoint --- include/libs/stream/tstream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 10 +++------- source/libs/stream/src/streamMeta.c | 7 +++++++ 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f220d65f84..425c9449f3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -346,6 +346,7 @@ typedef struct SStreamMeta { int32_t walScanCounter; void* streamBackend; int64_t streamBackendRid; + int64_t checkpointTs; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 07463009bf..40db8c7fac 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -944,8 +944,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in return 0; } -static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, SHashObj *vgIds, - int64_t checkpointId) { +static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) { int64_t timestampMs = taosGetTimestampMs(); if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) { return -1; @@ -967,7 +966,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre for (int32_t i = 0; i < totLevel; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *pTask = taosArrayGetP(pLevel, 0); - if (pTask->taskLevel == TASK_LEVEL__SOURCE && NULL == taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId))) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { int32_t sz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); @@ -1003,7 +1002,6 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre mndTransDrop(pTrans); return -1; } - taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &pTask->nodeId, sizeof(pTask->nodeId)); } } } @@ -1054,17 +1052,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; - SHashObj *vgIds = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - code = mndProcessStreamCheckpointTrans(pMnode, pStream, vgIds, checkpointId); + code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId); if (code == -1) { mInfo("stream:%s failed to do checkpoint, reason: last checkpoint not finished", pStream->name); } sdbRelease(pSdb, pStream); } - taosHashCleanup(vgIds); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c840876884..35b7622c73 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -407,6 +407,13 @@ int32_t streamDoCheckpoint(SStreamMeta* pMeta) { int code = -1; char buf[256] = {0}; + int64_t ts = taosGetTimestampMs(); + if (ts - pMeta->checkpointTs <= tsStreamCheckpointTickInterval * 1000) { + // avoid do checkpoint freq + return 0; + } + pMeta->checkpointTs = ts; + sprintf(buf, "%s/%s", pMeta->path, "checkpoints"); code = taosMulModeMkDir(buf, 0755); if (code != 0) {