diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 72aab9adf0..e072eaa831 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -249,6 +249,7 @@ typedef struct SQueryTableDataCond { SColumnInfo* colList; int32_t* pSlotList; // the column output destation slot, and it may be null int32_t type; // data block load type: + bool skipRollup; STimeWindow twindows; int64_t startVersion; int64_t endVersion; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 5990ae1c9c..6005c13455 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -49,6 +49,7 @@ typedef struct { uint64_t checkpointId; bool initTableReader; bool initTqReader; + bool skipRollup; int32_t numOfVgroups; void* sContext; // SSnapContext* void* pStateBackend; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 173c68a818..2e9eb884a0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -801,6 +801,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaReopen(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); +int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ac99dc9de3..665610304c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -235,6 +235,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui return TSDB_CODE_SUCCESS; } +#if 0 static int64_t tdRSmaTaskGetCheckpointId(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { int64_t checkpointId = -1; STaskId id = {.streamId = streamId, .taskId = taskId}; @@ -246,6 +247,7 @@ static int64_t tdRSmaTaskGetCheckpointId(SStreamMeta *pMeta, int64_t streamId, i taosRUnLockLatch(&pMeta->lock); return checkpointId; } +#endif static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { streamMetaUnregisterTask(pMeta, streamId, taskId); @@ -293,8 +295,12 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pStreamTask->pMeta = pVnode->pTq->pStreamMeta; pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1); sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG); +#if 0 pStreamTask->chkInfo.checkpointId = tdRSmaTaskGetCheckpointId(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId); +#else + pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta); +#endif pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; @@ -304,7 +310,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId); - SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState}; + SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState}; initStorageAPI(&handle.api); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); if (!pRSmaInfo->taskInfo[idx]) { @@ -682,8 +688,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ", ver:%" PRIi64, SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1, output ? output->info.version : -1); - continue; - // TSDB_CHECK_CODE(code, lino, _exit); + TSDB_CHECK_CODE(code, lino, _exit); } smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, @@ -1297,7 +1302,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { } int8_t fetchTriggerStat = - atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_ACTIVE); + atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); switch (fetchTriggerStat) { case TASK_TRIGGER_STAT_ACTIVE: { smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 479c57e65f..8c04306d0f 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -30,7 +30,7 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN offset = strlen(outputName); // rsma -#if 0 +#if 1 snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s", TD_DIRSEP, VNODE_RSMA_DIR, (endWithSep ? TD_DIRSEP : "")); #else diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index d1919d95ba..c56164ff9d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -48,9 +48,9 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); -static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, - int8_t* pLevel); +static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); +static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, + int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static int32_t doBuildDataBlock(STsdbReader* pReader); @@ -384,7 +384,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void initReaderStatus(&pReader->status); - pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level); + pReader->pTsdb = getTsdbByRetentions(pVnode, pCond, pVnode->config.tsdbCfg.retentions, idstr, &level); pReader->info.suid = pCond->suid; pReader->info.order = pCond->order; @@ -3140,9 +3140,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } } -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr, +static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idStr, int8_t* pLevel) { - if (VND_IS_RSMA(pVnode)) { + if (VND_IS_RSMA(pVnode) && !pCond->skipRollup) { int8_t level = 0; int8_t precision = pVnode->config.tsdbCfg.precision; int64_t now = taosGetTimestamp(precision); @@ -3158,7 +3158,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret } break; } - if ((now - pRetention->keep) <= (winSKey + offset)) { + if ((now - pRetention->keep) <= (pCond->twindows.skey + offset)) { break; } ++level; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 740ff7b0dc..6387b3d0d6 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -178,7 +178,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SColumn extractColumnFromColumnNode(SColumnNode* pColNode); -int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); +int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle); void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); int32_t convertFillType(int32_t mode); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 753d3e680c..39b47504c6 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1713,7 +1713,7 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { return c; } -int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { +int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) { pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); @@ -1732,6 +1732,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->startVersion = -1; pCond->endVersion = -1; + pCond->skipRollup = readHandle->skipRollup; int32_t j = 0; for (int32_t i = 0; i < pCond->numOfCols; ++i) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index efbc978323..c47e14ad0d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1035,7 +1035,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, } initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo); - code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); + code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3533,7 +3533,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN goto _error; } - code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); + code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pInfo->base.matchInfo.pList); goto _error; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 31f8647dd5..6202753a87 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -28,7 +28,6 @@ int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; int32_t streamMetaId = 0; -static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); static void metaHbToMnode(void* param, void* tmrId); static void streamMetaClear(SStreamMeta* pMeta); static int32_t streamMetaBegin(SStreamMeta* pMeta); @@ -188,7 +187,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->chkpCap = 2; taosInitRWLatch(&pMeta->chkpDirLock); - pMeta->chkpId = streamGetLatestCheckpointId(pMeta); + pMeta->chkpId = streamMetaGetLatestCheckpointId(pMeta); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) { taosMsleep(100); @@ -595,7 +594,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) { return 0; } -int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { +int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { int64_t chkpId = 0; TBC* pCur = NULL; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 0a3970adaa..e38ba85f62 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -527,17 +527,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { return pFileState->usedBuffs; } -static void getDebugRowBuff(char* val, int32_t vlen, char* output) { - for (int32_t i = 0; i < vlen; ++i) { - if (*(val + i) == '\0') { - sprintf(output + i, "0"); - } else { - sprintf(output + i, "%c", *(val + i)); - } - } - output[vlen] = 0; -} - int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { int32_t code = TSDB_CODE_SUCCESS; SListIter iter = {0}; @@ -553,7 +542,6 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1; char* buf = taosMemoryCalloc(1, len); - char output[1024]; void* batch = streamStateCreateBatch(); while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { @@ -571,15 +559,6 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number); -#if 1 - SStateKey* pStateKey = pSKey; - char* pStateVal = pPos->pRowBuff; - int32_t pStateVLen = pFileState->rowSize; - assert(pStateVLen < 1024); - getDebugRowBuff(pStateVal, pStateVLen, output); - qDebug("%s:%d key:[%" PRIu64 ",%" PRIi64 ",%" PRIi64 "] vlen:%d, val:%s", __func__, __LINE__, pStateKey->key.groupId, - pStateKey->key.ts, pStateKey->opNum, pStateVLen, output); -#endif code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize, 0, buf); taosMemoryFreeClear(pSKey); @@ -710,7 +689,6 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { if (pCur == NULL) { return -1; } - char output[1024]; int32_t recoverNum = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); while (code == TSDB_CODE_SUCCESS) { if (pFileState->curRowCount >= recoverNum) { @@ -730,17 +708,6 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { } ASSERT(vlen == pFileState->rowSize); memcpy(pNewPos->pRowBuff, pVal, vlen); - -#if 1 - SStateKey* pStateKey = pNewPos->pKey; - char* pStateVal = pVal; - int32_t pStateVLen = vlen; - assert(pStateVLen < 1024); - getDebugRowBuff(pStateVal, pStateVLen, output); - qDebug("%s:%d key:[%" PRIu64 ",%" PRIi64 ",%" PRIi64 "] vlen:%d, val:%s", __func__, __LINE__, pStateKey->key.groupId, - pStateKey->key.ts, pStateKey->opNum, pStateVLen, output); -#endif - taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);