chore: rsma checkpoint

This commit is contained in:
kailixu 2023-11-07 19:59:05 +08:00
parent 5f4aa0c445
commit 6b738884a4
11 changed files with 27 additions and 52 deletions

View File

@ -249,6 +249,7 @@ typedef struct SQueryTableDataCond {
SColumnInfo* colList;
int32_t* pSlotList; // the column output destation slot, and it may be null
int32_t type; // data block load type:
bool skipRollup;
STimeWindow twindows;
int64_t startVersion;
int64_t endVersion;

View File

@ -49,6 +49,7 @@ typedef struct {
uint64_t checkpointId;
bool initTableReader;
bool initTqReader;
bool skipRollup;
int32_t numOfVgroups;
void* sContext; // SSnapContext*
void* pStateBackend;

View File

@ -801,6 +801,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta);

View File

@ -235,6 +235,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return TSDB_CODE_SUCCESS;
}
#if 0
static int64_t tdRSmaTaskGetCheckpointId(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
int64_t checkpointId = -1;
STaskId id = {.streamId = streamId, .taskId = taskId};
@ -246,6 +247,7 @@ static int64_t tdRSmaTaskGetCheckpointId(SStreamMeta *pMeta, int64_t streamId, i
taosRUnLockLatch(&pMeta->lock);
return checkpointId;
}
#endif
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
streamMetaUnregisterTask(pMeta, streamId, taskId);
@ -293,8 +295,12 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1);
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG);
#if 0
pStreamTask->chkInfo.checkpointId =
tdRSmaTaskGetCheckpointId(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
#else
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
#endif
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
@ -304,7 +310,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState};
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState};
initStorageAPI(&handle.api);
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx]) {
@ -682,8 +688,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
", ver:%" PRIi64,
SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
output ? output->info.version : -1);
continue;
// TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
@ -1297,7 +1302,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
}
int8_t fetchTriggerStat =
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_ACTIVE);
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",

View File

@ -30,7 +30,7 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN
offset = strlen(outputName);
// rsma
#if 0
#if 1
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s", TD_DIRSEP, VNODE_RSMA_DIR,
(endWithSep ? TD_DIRSEP : ""));
#else

View File

@ -48,9 +48,9 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
int8_t* pLevel);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
@ -384,7 +384,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
initReaderStatus(&pReader->status);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->info.suid = pCond->suid;
pReader->info.order = pCond->order;
@ -3140,9 +3140,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
}
}
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idStr,
int8_t* pLevel) {
if (VND_IS_RSMA(pVnode)) {
if (VND_IS_RSMA(pVnode) && !pCond->skipRollup) {
int8_t level = 0;
int8_t precision = pVnode->config.tsdbCfg.precision;
int64_t now = taosGetTimestamp(precision);
@ -3158,7 +3158,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
}
break;
}
if ((now - pRetention->keep) <= (winSKey + offset)) {
if ((now - pRetention->keep) <= (pCond->twindows.skey + offset)) {
break;
}
++level;

View File

@ -178,7 +178,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle);
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode);

View File

@ -1713,7 +1713,7 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
return c;
}
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) {
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
@ -1732,6 +1732,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
pCond->skipRollup = readHandle->skipRollup;
int32_t j = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {

View File

@ -1035,7 +1035,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
}
initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -3533,7 +3533,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
goto _error;
}
code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pInfo->base.matchInfo.pList);
goto _error;

View File

@ -28,7 +28,6 @@ int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0;
int32_t streamMetaId = 0;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
static int32_t streamMetaBegin(SStreamMeta* pMeta);
@ -188,7 +187,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->chkpCap = 2;
taosInitRWLatch(&pMeta->chkpDirLock);
pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->chkpId = streamMetaGetLatestCheckpointId(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) {
taosMsleep(100);
@ -595,7 +594,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) {
return 0;
}
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
int64_t chkpId = 0;
TBC* pCur = NULL;

View File

@ -527,17 +527,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
return pFileState->usedBuffs;
}
static void getDebugRowBuff(char* val, int32_t vlen, char* output) {
for (int32_t i = 0; i < vlen; ++i) {
if (*(val + i) == '\0') {
sprintf(output + i, "0");
} else {
sprintf(output + i, "%c", *(val + i));
}
}
output[vlen] = 0;
}
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
int32_t code = TSDB_CODE_SUCCESS;
SListIter iter = {0};
@ -553,7 +542,6 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1;
char* buf = taosMemoryCalloc(1, len);
char output[1024];
void* batch = streamStateCreateBatch();
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
@ -571,15 +559,6 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
#if 1
SStateKey* pStateKey = pSKey;
char* pStateVal = pPos->pRowBuff;
int32_t pStateVLen = pFileState->rowSize;
assert(pStateVLen < 1024);
getDebugRowBuff(pStateVal, pStateVLen, output);
qDebug("%s:%d key:[%" PRIu64 ",%" PRIi64 ",%" PRIi64 "] vlen:%d, val:%s", __func__, __LINE__, pStateKey->key.groupId,
pStateKey->key.ts, pStateKey->opNum, pStateVLen, output);
#endif
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
0, buf);
taosMemoryFreeClear(pSKey);
@ -710,7 +689,6 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
if (pCur == NULL) {
return -1;
}
char output[1024];
int32_t recoverNum = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
while (code == TSDB_CODE_SUCCESS) {
if (pFileState->curRowCount >= recoverNum) {
@ -730,17 +708,6 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
}
ASSERT(vlen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, vlen);
#if 1
SStateKey* pStateKey = pNewPos->pKey;
char* pStateVal = pVal;
int32_t pStateVLen = vlen;
assert(pStateVLen < 1024);
getDebugRowBuff(pStateVal, pStateVLen, output);
qDebug("%s:%d key:[%" PRIu64 ",%" PRIi64 ",%" PRIi64 "] vlen:%d, val:%s", __func__, __LINE__, pStateKey->key.groupId,
pStateKey->key.ts, pStateKey->opNum, pStateVLen, output);
#endif
taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true;
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);