diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index a3ec2cca74..b190cd815c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -298,7 +298,7 @@ typedef struct STableMergeScanInfo { int64_t mergeLimit; SSortExecInfo sortExecInfo; bool bNewDuration; - SSDataBlock* pNewDurationBlock; + bool bOnlyRetrieveBlock; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e8213d0597..bb3cf7b937 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3242,29 +3242,28 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (true) { - if (pInfo->pNewDurationBlock) { - SSDataBlock* pNewDurationBlock = pInfo->pNewDurationBlock; - pInfo->pNewDurationBlock = NULL; - return pNewDurationBlock; - } + if (!pInfo->bOnlyRetrieveBlock) { + code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); + if (code != 0) { + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, code); + } - code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); - if (code != 0) { - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); - } + if (!hasNext || isTaskKilled(pTaskInfo)) { + pInfo->bNewDuration = false; + if (isTaskKilled(pTaskInfo)) { + qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + } + break; + } - if (!hasNext) { - break; + if (pInfo->bNewDuration) { + pInfo->bOnlyRetrieveBlock = true; + return NULL; + } } - - if (isTaskKilled(pTaskInfo)) { - qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - break; - } - // process this data block based on the probabilities bool processThisBlock = processBlockWithProbability(&pInfo->sample); if (!processThisBlock) { @@ -3273,6 +3272,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { uint32_t status = 0; code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); + if (pInfo->bOnlyRetrieveBlock) { + pInfo->bOnlyRetrieveBlock = false; + } if (code != TSDB_CODE_SUCCESS) { qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, code); @@ -3295,11 +3297,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - - if (pInfo->bNewDuration) { - pInfo->pNewDurationBlock = pBlock; - return NULL; - } + return pBlock; } @@ -3336,7 +3334,6 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* } void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { - uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); STableMergeScanInfo* pTmsInfo = param; pTmsInfo->bNewDuration = true; return;