chore: build checkpoint for rsma

This commit is contained in:
kailixu 2023-11-01 09:45:58 +08:00
parent e6684fa5de
commit ad1e6accd4
4 changed files with 12 additions and 6 deletions

View File

@ -772,6 +772,7 @@ void streamMetaInitForSnode(SStreamMeta* pMeta);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask);

View File

@ -590,7 +590,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfVnodeRsmaThreads = tsNumOfCores / 4;
tsNumOfVnodeRsmaThreads = tsNumOfCores / 2;
tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER) != 0) return -1;

View File

@ -15,6 +15,7 @@
#include "sma.h"
#include "tq.h"
#include "tstream.h"
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
@ -1096,10 +1097,15 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
pRSmaInfo->suid, i + 1);
}
#endif
if(pItem && pItem->pStreamState) {
if (pItem && pItem->pStreamState && pItem->pStreamTask) {
SStreamTask *pTask = pItem->pStreamTask;
atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); // adaption for API streamTaskBuildCheckpoint
pTask->checkpointingId = taosGetTimestampNs();
code = streamTaskBuildCheckpoint(pTask);
TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma persist, build stream checkpoint success, table:%" PRIi64 ", level:%d, id:%" PRIi64,
TD_VID(pVnode), pRSmaInfo->suid, i + 1, pTask->checkpointingId);
}
}
}

View File

@ -101,7 +101,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);