diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6480aa9104..3e8574f49f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3302,12 +3302,15 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (true) { if (pInfo->rtnNextDurationBlocks) { + qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", + GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; ++pInfo->nextDurationBlocksIdx; } else { for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) { blockDataDestroy(pInfo->nextDurationBlocks[i]); + pInfo->nextDurationBlocks[i] = NULL; } pInfo->rtnNextDurationBlocks = false; pInfo->nextDurationBlocksIdx = 0; @@ -3320,7 +3323,8 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { bool bSkipped = false; doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); pBlock = pInfo->pReaderBlock; - + qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", + GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); if (bFinished) { pInfo->bNewFilesetEvent = false; break; @@ -3330,7 +3334,11 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { if (!bSkipped) { pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); ++pInfo->numNextDurationBlocks; - ASSERT(pInfo->numNextDurationBlocks <= 2); + if (pInfo->numNextDurationBlocks > 2) { + qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); + pInfo->bNewFilesetEvent = false; + break; + } } if (pInfo->bNewFilesetEvent) { pInfo->rtnNextDurationBlocks = true; @@ -3394,6 +3402,8 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* } else if (type == TSD_READER_NOTIFY_NEXT_DURATION_BLOCK) { pTmsInfo->bNextDurationBlockEvent = true; } + qDebug("table merge scan receive notification. type %d, fileset %d", type, info->duration.filesetId); + return; } @@ -3403,6 +3413,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; + qDebug("%s table merge scan start duration ", GET_TASKID(pTaskInfo)); pInfo->bNewFilesetEvent = false; pInfo->bNextDurationBlockEvent = false; @@ -3434,6 +3445,8 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + qDebug("%s table merge scan stop duration ", GET_TASKID(pTaskInfo)); SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; @@ -3451,6 +3464,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SReadHandle* pHandle = &pInfo->base.readHandle; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + qDebug("%s table merge scan start group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId); { size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); @@ -3498,10 +3512,19 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; } - + for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) { + if (pInfo->nextDurationBlocks[i]) { + blockDataDestroy(pInfo->nextDurationBlocks[i]); + pInfo->nextDurationBlocks[i] = NULL; + } + pInfo->numNextDurationBlocks = 0; + pInfo->nextDurationBlocksIdx = 0; + } resetLimitInfoForNextGroup(&pInfo->limitInfo); taosHashCleanup(pInfo->mSkipTables); pInfo->mSkipTables = NULL; + qDebug("%s table merge scan stop group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId); + return TSDB_CODE_SUCCESS; } @@ -3612,6 +3635,13 @@ void destroyTableMergeScanOperatorInfo(void* param) { pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; + for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) { + if (pTableScanInfo->nextDurationBlocks[i] != NULL) { + blockDataDestroy(pTableScanInfo->nextDurationBlocks[i]); + pTableScanInfo->nextDurationBlocks[i] = NULL; + } + } + taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL;