diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 712ae7c95b..2402182eae 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -153,7 +153,8 @@ typedef struct { // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ typedef enum { - TSD_READER_NOTIFY_DURATION_START + TSD_READER_NOTIFY_DURATION_START, + TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, } ETsdReaderNotifyType; typedef union { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e1dc226bb9..6f9f81773a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4332,12 +4332,7 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); - tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr); - } - if (pStatus->pNextFilesetBlock && pStatus->pNextFilesetBlock->info.rows > 0) { - tsdbDebug("return the saved block from fileset %d files, %s", fid, pReader->idStr); - copyDataBlock(pBlock, pStatus->pNextFilesetBlock); - blockDataDestroy(pStatus->pNextFilesetBlock); + tsdbDebug("new duration %d start notification when buffer pre-fileset, %s", fid, pReader->idStr); } } return code; @@ -4364,12 +4359,12 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s", pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr); if (pStatus->bProcMemPreFileset) { - if ( pBlock->info.rows > 0) { - pStatus->pNextFilesetBlock = createOneDataBlock(pBlock, true); - blockDataCleanup(pBlock); - code = buildFromPreFilesetBuffer(pReader); - if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { - return code; + if (pBlock->info.rows > 0) { + if (pReader->notifyFn) { + int32_t fid = pReader->status.pCurrentFileset->fid; + STsdReaderNotifyInfo info = {0}; + info.duration.filesetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, &info, pReader->notifyParam); } } else { pStatus->bProcMemPreFileset = false; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index fe3441d058..3679015e9c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -220,7 +220,6 @@ typedef struct SReaderStatus { bool bProcMemFirstFileset; STableUidList procMemUidList; STableBlockScanInfo** pProcMemTableIter; - SSDataBlock* pNextFilesetBlock; } SReaderStatus; struct STsdbReader { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index f7e55b71be..e3e504cdbc 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -298,9 +298,14 @@ typedef struct STableMergeScanInfo { SHashObj* mSkipTables; int64_t mergeLimit; SSortExecInfo sortExecInfo; - bool bNewFileset; - bool bOnlyRetrieveBlock; + bool filesetDelimited; + bool bNewFilesetEvent; + bool bNextDurationBlockEvent; + int32_t numNextDurationBlocks; + SSDataBlock* nextDurationBlocks[2]; + bool rtnNextDurationBlocks; + int32_t nextDurationBlocksIdx; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ea73f60468..bce8325195 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3240,6 +3240,53 @@ static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock return TSDB_CODE_SUCCESS; } +static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + + SSDataBlock* pBlock = pInfo->pReaderBlock; + int32_t code = 0; + bool hasNext = false; + STsdbReader* reader = pInfo->base.dataReader; + + code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); + if (code != 0) { + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, code); + } + + if (!hasNext || isTaskKilled(pTaskInfo)) { + if (isTaskKilled(pTaskInfo)) { + qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + } + *pFinished = true; + return; + } + + uint32_t status = 0; + code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); + + if (code != TSDB_CODE_SUCCESS) { + qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, code); + } + + if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { + *pFinished = true; + return; + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + *pSkipped = true; + return; + } + return; +} + static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -3255,53 +3302,42 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (true) { - if (!pInfo->bOnlyRetrieveBlock) { - code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); - if (code != 0) { - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); - } - - if (!hasNext || isTaskKilled(pTaskInfo)) { - pInfo->bNewFileset = false; - if (isTaskKilled(pTaskInfo)) { - qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + if (pInfo->rtnNextDurationBlocks) { + if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { + copyDataBlock(pBlock, pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]); + blockDataDestroy(pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]); + ++pInfo->nextDurationBlocksIdx; + if (pInfo->nextDurationBlocksIdx >= pInfo->numNextDurationBlocks) { + pInfo->rtnNextDurationBlocks = false; + pInfo->nextDurationBlocksIdx = 0; } + } + } else { + bool bFinished = false; + bool bSkipped = false; + doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); + if (bFinished) { + pInfo->bNewFilesetEvent = false; break; } - if (pInfo->bNewFileset) { - pInfo->bOnlyRetrieveBlock = true; - return NULL; + if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) { + if (!bSkipped) { + pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); + ++pInfo->numNextDurationBlocks; + ASSERT(pInfo->numNextDurationBlocks <= 2); + } + if (pInfo->bNewFilesetEvent) { + pInfo->rtnNextDurationBlocks = true; + return NULL; + } + if (pInfo->bNextDurationBlockEvent) { + pInfo->bNextDurationBlockEvent = false; + continue; + } } + if (bSkipped) continue; } - // process this data block based on the probabilities - bool processThisBlock = processBlockWithProbability(&pInfo->sample); - if (!processThisBlock) { - continue; - } - - uint32_t status = 0; - code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); - if (pInfo->bOnlyRetrieveBlock) { - pInfo->bOnlyRetrieveBlock = false; - } - if (code != TSDB_CODE_SUCCESS) { - qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); - } - - if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { - break; - } - - // current block is filter out according to filter condition, continue load the next block - if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { - continue; - } - pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); if (pInfo->mergeLimit != -1) { @@ -3317,6 +3353,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { return NULL; } + SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { int32_t tsTargetSlotId = 0; for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) { @@ -3348,7 +3385,11 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { STableMergeScanInfo* pTmsInfo = param; - pTmsInfo->bNewFileset = true; + if (type == TSD_READER_NOTIFY_DURATION_START) { + pTmsInfo->bNewFilesetEvent = true; + } else if (type == TSD_READER_NOTIFY_NEXT_DURATION_BLOCK) { + pTmsInfo->bNextDurationBlockEvent = true; + } return; } @@ -3358,7 +3399,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; - pInfo->bNewFileset = false; + pInfo->bNewFilesetEvent = false; + pInfo->bNextDurationBlockEvent = false; + pInfo->numNextDurationBlocks = 0; + pInfo->nextDurationBlocksIdx = 0; + pInfo->rtnNextDurationBlocks = false; pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -3535,7 +3580,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { - if (pInfo->bNewFileset) { + if (pInfo->bNewFilesetEvent) { stopDurationForGroupTableMergeScan(pOperator); startDurationForGroupTableMergeScan(pOperator); } else {