From ad1e6accd4f64cfa37ad186c8f93ae987dc7fa55 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 1 Nov 2023 09:45:58 +0800 Subject: [PATCH] chore: build checkpoint for rsma --- include/libs/stream/tstream.h | 1 + source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 14 ++++++++++---- source/libs/stream/inc/streamInt.h | 1 - 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 629efa00b3..b9c6c905c9 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -772,6 +772,7 @@ void streamMetaInitForSnode(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); +int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cc485b16dc..a5459798fa 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -590,7 +590,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER) != 0) return -1; - tsNumOfVnodeRsmaThreads = tsNumOfCores / 4; + tsNumOfVnodeRsmaThreads = tsNumOfCores / 2; tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER) != 0) return -1; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4ea7d4612a..980b23986e 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,6 +15,7 @@ #include "sma.h" #include "tq.h" +#include "tstream.h" #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt @@ -1096,10 +1097,15 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { pRSmaInfo->suid, i + 1); } #endif - if(pItem && pItem->pStreamState) { - - } - + if (pItem && pItem->pStreamState && pItem->pStreamTask) { + SStreamTask *pTask = pItem->pStreamTask; + atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); // adaption for API streamTaskBuildCheckpoint + pTask->checkpointingId = taosGetTimestampNs(); + code = streamTaskBuildCheckpoint(pTask); + TSDB_CHECK_CODE(code, lino, _exit); + smaInfo("vgId:%d, rsma persist, build stream checkpoint success, table:%" PRIi64 ", level:%d, id:%" PRIi64, + TD_VID(pVnode), pRSmaInfo->suid, i + 1, pTask->checkpointingId); + } } } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4cd8319a07..1f43e44dca 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -101,7 +101,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId); -int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);