From ab266c712f6fc5078879c1ee0850d87440eab341 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 1 Nov 2023 12:29:54 +0800 Subject: [PATCH] chore: checkpoint for rsma stream state --- include/common/tmsg.h | 5 ++++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 7 +++-- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/sma/smaOpen.c | 18 ++++++------ source/dnode/vnode/src/sma/smaRollup.c | 8 +++-- source/dnode/vnode/src/tsdb/tsdbRead2.c | 6 ++-- source/dnode/vnode/src/vnd/vnodeCfg.c | 29 ++++++++++--------- source/dnode/vnode/src/vnd/vnodeCommit.c | 6 ++-- .../tsim/sma/rsmaPersistenceRecovery.sim | 2 +- 9 files changed, 48 insertions(+), 35 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 07eb8a461a..aa39a9da30 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -451,6 +451,11 @@ typedef struct SRetention { int8_t keepUnit; } SRetention; +typedef struct SRetentionEx { + SRetention rtn; + int64_t checkpointId; +} SRetentionEx; + #define RETENTION_VALID(l, r) ((((l) == 0 && (r)->freq >= 0) || ((r)->freq > 0)) && ((r)->keep > 0)) #pragma pack(push, 1) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c4d525a871..5a4b341662 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -134,10 +134,11 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->tsdbCfg.minRows = pCreate->minRows; pCfg->tsdbCfg.maxRows = pCreate->maxRows; for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) { - SRetention *pRetention = &pCfg->tsdbCfg.retentions[i]; - memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); + SRetentionEx *pRetention = &pCfg->tsdbCfg.retentions[i]; + memcpy(&pRetention->rtn, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); + pRetention->checkpointId = -1; if (i == 0) { - if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1; + if ((pRetention->rtn.freq >= 0 && pRetention->rtn.keep > 0)) pCfg->isRsma = 1; } } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6a0c991be4..e92fc04f6e 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -287,7 +287,7 @@ struct STsdbCfg { int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead int32_t keepTimeOffset; // just for save config, use STsdbKeepCfg in STsdb instead - SRetention retentions[TSDB_RETENTION_MAX]; + SRetentionEx retentions[TSDB_RETENTION_MAX]; }; typedef struct { diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 633e096314..cea4ccb1b7 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -16,13 +16,13 @@ #include "sma.h" #include "tsdb.h" -static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration); +static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t precision, int32_t duration); static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type); static int32_t rsmaRestore(SSma *pSma); #define SMA_SET_KEEP_CFG(v, l) \ do { \ - SRetention *r = &pCfg->retentions[l]; \ + SRetention *r = &(pCfg->retentions[l].rtn); \ pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \ pKeepCfg->keep0 = pKeepCfg->keep2; \ pKeepCfg->keep1 = pKeepCfg->keep2; \ @@ -32,7 +32,7 @@ static int32_t rsmaRestore(SSma *pSma); #define SMA_OPEN_RSMA_IMPL(v, l, force) \ do { \ - SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ + SRetention *r = &(((SRetentionEx *)VND_RETENTIONS(v) + l)->rtn); \ if (!RETENTION_VALID(l, r)) { \ if (l == 0) { \ code = TSDB_CODE_INVALID_PARA; \ @@ -59,9 +59,9 @@ static int32_t rsmaRestore(SSma *pSma); * @param duration * @return int32_t */ -static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration) { - int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE); - int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE); +static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t precision, int32_t duration) { + int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->rtn.freq, precision, TIME_UNIT_MINUTE); + int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->rtn.keep, precision, TIME_UNIT_MINUTE); int32_t days = duration; // min if (days < freqDuration) { @@ -76,10 +76,10 @@ static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t p goto _exit; } - freqDuration = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE); - keepDuration = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE); + freqDuration = convertTimeFromPrecisionToUnit((r + level)->rtn.freq, precision, TIME_UNIT_MINUTE); + keepDuration = convertTimeFromPrecisionToUnit((r + level)->rtn.keep, precision, TIME_UNIT_MINUTE); - int32_t nFreqTimes = (r + level)->freq / (60 * 1000); // use 60s for freq of 1st level + int32_t nFreqTimes = (r + level)->rtn.freq / (60 * 1000); // use 60s for freq of 1st level days *= (nFreqTimes > 1 ? nFreqTimes : 1); if (days < freqDuration) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 980b23986e..1b13f37141 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -266,7 +266,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat } pStreamTask->id.taskId = 0; pStreamTask->id.streamId = pRSmaInfo->suid + idx; + pStreamTask->chkInfo.startTs = taosGetTimestampMs(); pStreamTask->pMeta = pVnode->pTq->pStreamMeta; + pStreamTask->chkInfo.checkpointId = pTsdbCfg->retentions[idx + 1].checkpointId; pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); if (!pStreamState) { @@ -1096,16 +1098,18 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode), pRSmaInfo->suid, i + 1); } -#endif - if (pItem && pItem->pStreamState && pItem->pStreamTask) { +#else + if (pItem) { SStreamTask *pTask = pItem->pStreamTask; atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); // adaption for API streamTaskBuildCheckpoint pTask->checkpointingId = taosGetTimestampNs(); code = streamTaskBuildCheckpoint(pTask); TSDB_CHECK_CODE(code, lino, _exit); + (pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId; smaInfo("vgId:%d, rsma persist, build stream checkpoint success, table:%" PRIi64 ", level:%d, id:%" PRIi64, TD_VID(pVnode), pRSmaInfo->suid, i + 1, pTask->checkpointingId); } +#endif } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index d1919d95ba..be88a5a435 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -49,7 +49,7 @@ static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo STsdbReader* pReader); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, +static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* retentions, const char* idstr, int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); @@ -3140,7 +3140,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } } -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr, +static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* retentions, const char* idStr, int8_t* pLevel) { if (VND_IS_RSMA(pVnode)) { int8_t level = 0; @@ -3151,7 +3151,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret : 1000000L); for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) { - SRetention* pRetention = retentions + level; + SRetention* pRetention = &((retentions + level)->rtn); if (pRetention->keep <= 0) { if (level > 0) { --level; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 07bfa6c719..d429eb2a94 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -106,23 +106,24 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset) < 0) return -1; - if (pCfg->tsdbCfg.retentions[0].keep > 0) { + if (pCfg->tsdbCfg.retentions[0].rtn.keep > 0) { int32_t nRetention = 1; - if (pCfg->tsdbCfg.retentions[1].freq > 0) { + if (pCfg->tsdbCfg.retentions[1].rtn.freq > 0) { ++nRetention; - if (pCfg->tsdbCfg.retentions[2].freq > 0) { + if (pCfg->tsdbCfg.retentions[2].rtn.freq > 0) { ++nRetention; } } SJson *pNodeRetentions = tjsonCreateArray(); tjsonAddItemToObject(pJson, "retentions", pNodeRetentions); for (int32_t i = 0; i < nRetention; ++i) { - SJson *pNodeRetention = tjsonCreateObject(); - const SRetention *pRetention = pCfg->tsdbCfg.retentions + i; - tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq); - tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit); - tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep); - tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit); + SJson *pNodeRetention = tjsonCreateObject(); + const SRetentionEx *pRetention = pCfg->tsdbCfg.retentions + i; + tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->rtn.freq); + tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->rtn.freqUnit); + tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->rtn.keep); + tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->rtn.keepUnit); + tjsonAddIntegerToObject(pNodeRetention, "checkpointId", pRetention->checkpointId); tjsonAddItemToArray(pNodeRetentions, pNodeRetention); } } @@ -231,10 +232,12 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { for (int32_t i = 0; i < nRetention; ++i) { SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i); ASSERT(pNodeRetention != NULL); - tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq, code); - tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit, code); - tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep, code); - tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code); + SRetentionEx *pRetention = &(pCfg->tsdbCfg.retentions[i]); + tjsonGetNumberValue(pNodeRetention, "freq", pRetention->rtn.freq, code); + tjsonGetNumberValue(pNodeRetention, "freqUnit", pRetention->rtn.freqUnit, code); + tjsonGetNumberValue(pNodeRetention, "keep", pRetention->rtn.keep, code); + tjsonGetNumberValue(pNodeRetention, "keepUnit", pRetention->rtn.keepUnit, code); + tjsonGetNumberValue(pNodeRetention, "checkpointId", pRetention->checkpointId, code); } tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code); if (code < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 50ca2f5d03..ca4335f391 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -290,6 +290,9 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { tsem_wait(&pVnode->canCommit); if(syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; + + code = smaPrepareAsyncCommit(pVnode->pSma); + if (code) goto _exit; pVnode->state.commitTerm = pVnode->state.applyTerm; @@ -313,9 +316,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { metaPrepareAsyncCommit(pVnode->pMeta); - code = smaPrepareAsyncCommit(pVnode->pSma); - if (code) goto _exit; - taosThreadMutexLock(&pVnode->mutex); ASSERT(pVnode->onCommit == NULL); pVnode->onCommit = pVnode->inUse; diff --git a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim index 6f78829db7..c70f2dc20a 100644 --- a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim +++ b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim @@ -5,7 +5,7 @@ sleep 50 sql connect #todo wait for streamState checkpoint -return 1 +#return 1 print =============== create database with retentions sql create database d0 retentions -:7d,5m:21d,15m:365d;