diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e29583d8fc..a3ec2cca74 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -297,6 +297,8 @@ typedef struct STableMergeScanInfo { SHashObj* mSkipTables; int64_t mergeLimit; SSortExecInfo sortExecInfo; + bool bNewDuration; + SSDataBlock* pNewDurationBlock; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b88bf1ff56..e8213d0597 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3242,6 +3242,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (true) { + if (pInfo->pNewDurationBlock) { + SSDataBlock* pNewDurationBlock = pInfo->pNewDurationBlock; + pInfo->pNewDurationBlock = NULL; + return pNewDurationBlock; + } + code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); @@ -3267,7 +3273,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { uint32_t status = 0; code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); - // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, code); @@ -3290,6 +3295,11 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + if (pInfo->bNewDuration) { + pInfo->pNewDurationBlock = pBlock; + return NULL; + } return pBlock; } @@ -3327,7 +3337,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); - + STableMergeScanInfo* pTmsInfo = param; + pTmsInfo->bNewDuration = true; return; } @@ -3337,6 +3348,8 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; + pInfo->bNewDuration = false; + pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, @@ -3354,7 +3367,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { ps->param = param; ps->onlyRef = false; tsortAddSource(pInfo->pSortHandle, ps); - + if (numOfTable == 1) { tsortSetSingleTableMerge(pInfo->pSortHandle); } else { @@ -3510,17 +3523,23 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { - // Data of this group are all dumped, let's try the next group - stopGroupTableMergeScan(pOperator); - if (pInfo->tableEndIndex >= tableListSize - 1) { - setOperatorCompleted(pOperator); - break; - } + if (pInfo->bNewDuration) { + stopDurationForGroupTableMergeScan(pOperator); + startDurationForGroupTableMergeScan(pOperator); - pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; - startGroupTableMergeScan(pOperator); - resetLimitInfoForNextGroup(&pInfo->limitInfo); + } else { + // Data of this group are all dumped, let's try the next group + stopGroupTableMergeScan(pOperator); + if (pInfo->tableEndIndex >= tableListSize - 1) { + setOperatorCompleted(pOperator); + break; + } + + pInfo->tableStartIndex = pInfo->tableEndIndex + 1; + pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; + startGroupTableMergeScan(pOperator); + resetLimitInfoForNextGroup(&pInfo->limitInfo); + } } }