chore: checkpoint for rsma stream state

This commit is contained in:
kailixu 2023-11-01 12:29:54 +08:00
parent 1530fe0c49
commit ab266c712f
9 changed files with 48 additions and 35 deletions

View File

@ -451,6 +451,11 @@ typedef struct SRetention {
int8_t keepUnit; int8_t keepUnit;
} SRetention; } SRetention;
typedef struct SRetentionEx {
SRetention rtn;
int64_t checkpointId;
} SRetentionEx;
#define RETENTION_VALID(l, r) ((((l) == 0 && (r)->freq >= 0) || ((r)->freq > 0)) && ((r)->keep > 0)) #define RETENTION_VALID(l, r) ((((l) == 0 && (r)->freq >= 0) || ((r)->freq > 0)) && ((r)->keep > 0))
#pragma pack(push, 1) #pragma pack(push, 1)

View File

@ -134,10 +134,11 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->tsdbCfg.minRows = pCreate->minRows; pCfg->tsdbCfg.minRows = pCreate->minRows;
pCfg->tsdbCfg.maxRows = pCreate->maxRows; pCfg->tsdbCfg.maxRows = pCreate->maxRows;
for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) { for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
SRetention *pRetention = &pCfg->tsdbCfg.retentions[i]; SRetentionEx *pRetention = &pCfg->tsdbCfg.retentions[i];
memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); memcpy(&pRetention->rtn, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
pRetention->checkpointId = -1;
if (i == 0) { if (i == 0) {
if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1; if ((pRetention->rtn.freq >= 0 && pRetention->rtn.keep > 0)) pCfg->isRsma = 1;
} }
} }

View File

@ -287,7 +287,7 @@ struct STsdbCfg {
int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
int32_t keepTimeOffset; // just for save config, use STsdbKeepCfg in STsdb instead int32_t keepTimeOffset; // just for save config, use STsdbKeepCfg in STsdb instead
SRetention retentions[TSDB_RETENTION_MAX]; SRetentionEx retentions[TSDB_RETENTION_MAX];
}; };
typedef struct { typedef struct {

View File

@ -16,13 +16,13 @@
#include "sma.h" #include "sma.h"
#include "tsdb.h" #include "tsdb.h"
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration); static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t precision, int32_t duration);
static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type); static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
static int32_t rsmaRestore(SSma *pSma); static int32_t rsmaRestore(SSma *pSma);
#define SMA_SET_KEEP_CFG(v, l) \ #define SMA_SET_KEEP_CFG(v, l) \
do { \ do { \
SRetention *r = &pCfg->retentions[l]; \ SRetention *r = &(pCfg->retentions[l].rtn); \
pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \ pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
pKeepCfg->keep0 = pKeepCfg->keep2; \ pKeepCfg->keep0 = pKeepCfg->keep2; \
pKeepCfg->keep1 = pKeepCfg->keep2; \ pKeepCfg->keep1 = pKeepCfg->keep2; \
@ -32,7 +32,7 @@ static int32_t rsmaRestore(SSma *pSma);
#define SMA_OPEN_RSMA_IMPL(v, l, force) \ #define SMA_OPEN_RSMA_IMPL(v, l, force) \
do { \ do { \
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ SRetention *r = &(((SRetentionEx *)VND_RETENTIONS(v) + l)->rtn); \
if (!RETENTION_VALID(l, r)) { \ if (!RETENTION_VALID(l, r)) { \
if (l == 0) { \ if (l == 0) { \
code = TSDB_CODE_INVALID_PARA; \ code = TSDB_CODE_INVALID_PARA; \
@ -59,9 +59,9 @@ static int32_t rsmaRestore(SSma *pSma);
* @param duration * @param duration
* @return int32_t * @return int32_t
*/ */
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration) { static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t precision, int32_t duration) {
int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE); int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->rtn.freq, precision, TIME_UNIT_MINUTE);
int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE); int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->rtn.keep, precision, TIME_UNIT_MINUTE);
int32_t days = duration; // min int32_t days = duration; // min
if (days < freqDuration) { if (days < freqDuration) {
@ -76,10 +76,10 @@ static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t p
goto _exit; goto _exit;
} }
freqDuration = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE); freqDuration = convertTimeFromPrecisionToUnit((r + level)->rtn.freq, precision, TIME_UNIT_MINUTE);
keepDuration = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE); keepDuration = convertTimeFromPrecisionToUnit((r + level)->rtn.keep, precision, TIME_UNIT_MINUTE);
int32_t nFreqTimes = (r + level)->freq / (60 * 1000); // use 60s for freq of 1st level int32_t nFreqTimes = (r + level)->rtn.freq / (60 * 1000); // use 60s for freq of 1st level
days *= (nFreqTimes > 1 ? nFreqTimes : 1); days *= (nFreqTimes > 1 ? nFreqTimes : 1);
if (days < freqDuration) { if (days < freqDuration) {

View File

@ -266,7 +266,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
} }
pStreamTask->id.taskId = 0; pStreamTask->id.taskId = 0;
pStreamTask->id.streamId = pRSmaInfo->suid + idx; pStreamTask->id.streamId = pRSmaInfo->suid + idx;
pStreamTask->chkInfo.startTs = taosGetTimestampMs();
pStreamTask->pMeta = pVnode->pTq->pStreamMeta; pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
pStreamTask->chkInfo.checkpointId = pTsdbCfg->retentions[idx + 1].checkpointId;
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
if (!pStreamState) { if (!pStreamState) {
@ -1096,16 +1098,18 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode), smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode),
pRSmaInfo->suid, i + 1); pRSmaInfo->suid, i + 1);
} }
#endif #else
if (pItem && pItem->pStreamState && pItem->pStreamTask) { if (pItem) {
SStreamTask *pTask = pItem->pStreamTask; SStreamTask *pTask = pItem->pStreamTask;
atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); // adaption for API streamTaskBuildCheckpoint atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); // adaption for API streamTaskBuildCheckpoint
pTask->checkpointingId = taosGetTimestampNs(); pTask->checkpointingId = taosGetTimestampNs();
code = streamTaskBuildCheckpoint(pTask); code = streamTaskBuildCheckpoint(pTask);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
(pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId;
smaInfo("vgId:%d, rsma persist, build stream checkpoint success, table:%" PRIi64 ", level:%d, id:%" PRIi64, smaInfo("vgId:%d, rsma persist, build stream checkpoint success, table:%" PRIi64 ", level:%d, id:%" PRIi64,
TD_VID(pVnode), pRSmaInfo->suid, i + 1, pTask->checkpointingId); TD_VID(pVnode), pRSmaInfo->suid, i + 1, pTask->checkpointingId);
} }
#endif
} }
} }

View File

@ -49,7 +49,7 @@ static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo
STsdbReader* pReader); STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* retentions, const char* idstr,
int8_t* pLevel); int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
@ -3140,7 +3140,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
} }
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr, static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* retentions, const char* idStr,
int8_t* pLevel) { int8_t* pLevel) {
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
int8_t level = 0; int8_t level = 0;
@ -3151,7 +3151,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
: 1000000L); : 1000000L);
for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) { for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
SRetention* pRetention = retentions + level; SRetention* pRetention = &((retentions + level)->rtn);
if (pRetention->keep <= 0) { if (pRetention->keep <= 0) {
if (level > 0) { if (level > 0) {
--level; --level;

View File

@ -106,11 +106,11 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset) < 0) return -1;
if (pCfg->tsdbCfg.retentions[0].keep > 0) { if (pCfg->tsdbCfg.retentions[0].rtn.keep > 0) {
int32_t nRetention = 1; int32_t nRetention = 1;
if (pCfg->tsdbCfg.retentions[1].freq > 0) { if (pCfg->tsdbCfg.retentions[1].rtn.freq > 0) {
++nRetention; ++nRetention;
if (pCfg->tsdbCfg.retentions[2].freq > 0) { if (pCfg->tsdbCfg.retentions[2].rtn.freq > 0) {
++nRetention; ++nRetention;
} }
} }
@ -118,11 +118,12 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions); tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
for (int32_t i = 0; i < nRetention; ++i) { for (int32_t i = 0; i < nRetention; ++i) {
SJson *pNodeRetention = tjsonCreateObject(); SJson *pNodeRetention = tjsonCreateObject();
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i; const SRetentionEx *pRetention = pCfg->tsdbCfg.retentions + i;
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq); tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->rtn.freq);
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit); tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->rtn.freqUnit);
tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep); tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->rtn.keep);
tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit); tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->rtn.keepUnit);
tjsonAddIntegerToObject(pNodeRetention, "checkpointId", pRetention->checkpointId);
tjsonAddItemToArray(pNodeRetentions, pNodeRetention); tjsonAddItemToArray(pNodeRetentions, pNodeRetention);
} }
} }
@ -231,10 +232,12 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
for (int32_t i = 0; i < nRetention; ++i) { for (int32_t i = 0; i < nRetention; ++i) {
SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i); SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i);
ASSERT(pNodeRetention != NULL); ASSERT(pNodeRetention != NULL);
tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq, code); SRetentionEx *pRetention = &(pCfg->tsdbCfg.retentions[i]);
tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit, code); tjsonGetNumberValue(pNodeRetention, "freq", pRetention->rtn.freq, code);
tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep, code); tjsonGetNumberValue(pNodeRetention, "freqUnit", pRetention->rtn.freqUnit, code);
tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code); tjsonGetNumberValue(pNodeRetention, "keep", pRetention->rtn.keep, code);
tjsonGetNumberValue(pNodeRetention, "keepUnit", pRetention->rtn.keepUnit, code);
tjsonGetNumberValue(pNodeRetention, "checkpointId", pRetention->checkpointId, code);
} }
tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code); tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code);
if (code < 0) return -1; if (code < 0) return -1;

View File

@ -291,6 +291,9 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
if(syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; if(syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit;
code = smaPrepareAsyncCommit(pVnode->pSma);
if (code) goto _exit;
pVnode->state.commitTerm = pVnode->state.applyTerm; pVnode->state.commitTerm = pVnode->state.applyTerm;
pInfo->info.config = pVnode->config; pInfo->info.config = pVnode->config;
@ -313,9 +316,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
metaPrepareAsyncCommit(pVnode->pMeta); metaPrepareAsyncCommit(pVnode->pMeta);
code = smaPrepareAsyncCommit(pVnode->pSma);
if (code) goto _exit;
taosThreadMutexLock(&pVnode->mutex); taosThreadMutexLock(&pVnode->mutex);
ASSERT(pVnode->onCommit == NULL); ASSERT(pVnode->onCommit == NULL);
pVnode->onCommit = pVnode->inUse; pVnode->onCommit = pVnode->inUse;

View File

@ -5,7 +5,7 @@ sleep 50
sql connect sql connect
#todo wait for streamState checkpoint #todo wait for streamState checkpoint
return 1 #return 1
print =============== create database with retentions print =============== create database with retentions
sql create database d0 retentions -:7d,5m:21d,15m:365d; sql create database d0 retentions -:7d,5m:21d,15m:365d;