diff --git a/docs/en/12-taos-sql/05-insert.md b/docs/en/12-taos-sql/05-insert.md index e5f502c563..f6e39a9734 100644 --- a/docs/en/12-taos-sql/05-insert.md +++ b/docs/en/12-taos-sql/05-insert.md @@ -158,8 +158,8 @@ Automatically creating table and the table name is specified through the `tbname ```sql INSERT INTO meters(tbname, location, groupId, ts, current, phase) - values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32) - values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33) - values('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33) + values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 0.32) + ('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 0.33) + ('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 0.33) ``` diff --git a/docs/zh/12-taos-sql/05-insert.md b/docs/zh/12-taos-sql/05-insert.md index efcd5dd962..583d047c43 100644 --- a/docs/zh/12-taos-sql/05-insert.md +++ b/docs/zh/12-taos-sql/05-insert.md @@ -158,7 +158,7 @@ INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) FILE '/tmp/c 自动建表, 表名通过tbname列指定 ```sql INSERT INTO meters(tbname, location, groupId, ts, current, phase) - values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32) - values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33) - values('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33) + values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 0.32) + ('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 0.33) + ('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 0.33) ``` diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 712ae7c95b..2402182eae 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -153,7 +153,8 @@ typedef struct { // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ typedef enum { - TSD_READER_NOTIFY_DURATION_START + TSD_READER_NOTIFY_DURATION_START, + TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, } ETsdReaderNotifyType; typedef union { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 4cca39f220..f6fcfc6be1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -76,6 +76,8 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, ST static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } +static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus); + static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { pSupInfo->smaValid = true; @@ -2565,7 +2567,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { } if (pReader->status.bProcMemPreFileset) { - resetTableListIndex(&pReader->status); + tsdbDebug("will start pre-fileset %d buffer processing. %s", fid, pReader->idStr); + pReader->status.procMemUidList.tableUidList = pReader->status.uidList.tableUidList; + resetPreFilesetMemTableListIndex(&pReader->status); } if (!pReader->status.bProcMemPreFileset) { @@ -2573,6 +2577,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); + tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr); } } @@ -2643,6 +2648,14 @@ static void resetTableListIndex(SReaderStatus* pStatus) { pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); } +static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus) { + STableUidList* pList = &pStatus->procMemUidList; + + pList->currentIndex = 0; + uint64_t uid = pList->tableUidList[0]; + pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); +} + static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) { pOrderedCheckInfo->currentIndex += 1; if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { @@ -2655,6 +2668,19 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt return (pStatus->pTableIter != NULL); } +static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) { + STableUidList* pUidList = &pStatus->procMemUidList; + pUidList->currentIndex += 1; + if (pUidList->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { + pStatus->pProcMemTableIter = NULL; + return false; + } + + uint64_t uid = pUidList->tableUidList[pUidList->currentIndex]; + pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); + return (pStatus->pProcMemTableIter != NULL); +} + static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; @@ -2889,6 +2915,47 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; } +static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) { + SReaderStatus* pStatus = &pReader->status; + + tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, pReader->idStr); + + while (1) { + if (pReader->code != TSDB_CODE_SUCCESS) { + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + return pReader->code; + } + + STableBlockScanInfo** pBlockScanInfo = pStatus->pProcMemTableIter; + if (pReader->pIgnoreTables && + taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) { + bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + continue; + } + + initMemDataIterator(*pBlockScanInfo, pReader); + initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + + int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pReader->resBlockInfo.pResBlock->info.rows > 0) { + return TSDB_CODE_SUCCESS; + } + + // current table is exhausted, let's try next table + bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + } +} + static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; @@ -4244,6 +4311,33 @@ _err: return code; } +static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SReaderStatus* pStatus = &pReader->status; + + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; + + int32_t fid = pReader->status.pCurrentFileset->fid; + STimeWindow win = {0}; + tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); + + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; + code = buildBlockFromBufferSeqForPreFileset(pReader, endKey); + if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { + return code; + } else { + tsdbDebug("finished pre-fileset %d buffer processing. %s", fid, pReader->idStr); + pStatus->bProcMemPreFileset = false; + if (pReader->notifyFn) { + STsdReaderNotifyInfo info = {0}; + info.duration.filesetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); + tsdbDebug("new duration %d start notification when buffer pre-fileset, %s", fid, pReader->idStr); + } + } + return code; +} + static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; int32_t code = TSDB_CODE_SUCCESS; @@ -4251,22 +4345,9 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { if (pStatus->loadFromFile) { if (pStatus->bProcMemPreFileset) { - int32_t fid = pReader->status.pCurrentFileset->fid; - STimeWindow win = {0}; - tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); - - int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; - code = buildBlockFromBufferSequentially(pReader, endKey); + code = buildFromPreFilesetBuffer(pReader); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; - } else { - pStatus->bProcMemPreFileset = false; - if (pReader->notifyFn) { - STsdReaderNotifyInfo info = {0}; - info.duration.filesetId = fid; - pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); - } - resetTableListIndex(pStatus); } } @@ -4275,6 +4356,21 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { return code; } + tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s", + pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr); + if (pStatus->bProcMemPreFileset) { + if (pBlock->info.rows > 0) { + if (pReader->notifyFn) { + int32_t fid = pReader->status.pCurrentFileset->fid; + STsdReaderNotifyInfo info = {0}; + info.duration.filesetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, &info, pReader->notifyParam); + } + } else { + pStatus->bProcMemPreFileset = false; + } + } + if (pBlock->info.rows <= 0) { resetTableListIndex(&pReader->status); int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 43cd499aca..3679015e9c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -210,12 +210,16 @@ typedef struct SReaderStatus { SArray* pLDataIterArray; SRowMerger merger; SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data + // the following for preceeds fileset memory processing + // TODO: refactor into seperate struct bool bProcMemPreFileset; int64_t memTableMaxKey; int64_t memTableMinKey; int64_t prevFilesetStartKey; int64_t prevFilesetEndKey; bool bProcMemFirstFileset; + STableUidList procMemUidList; + STableBlockScanInfo** pProcMemTableIter; } SReaderStatus; struct STsdbReader { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index f7e55b71be..e3e504cdbc 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -298,9 +298,14 @@ typedef struct STableMergeScanInfo { SHashObj* mSkipTables; int64_t mergeLimit; SSortExecInfo sortExecInfo; - bool bNewFileset; - bool bOnlyRetrieveBlock; + bool filesetDelimited; + bool bNewFilesetEvent; + bool bNextDurationBlockEvent; + int32_t numNextDurationBlocks; + SSDataBlock* nextDurationBlocks[2]; + bool rtnNextDurationBlocks; + int32_t nextDurationBlocksIdx; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index cb58022569..ef2a99d1d1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3240,6 +3240,52 @@ static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock return TSDB_CODE_SUCCESS; } +static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + + SSDataBlock* pBlock = pInfo->pReaderBlock; + int32_t code = 0; + bool hasNext = false; + STsdbReader* reader = pInfo->base.dataReader; + + code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); + if (code != 0) { + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, code); + } + + if (!hasNext || isTaskKilled(pTaskInfo)) { + if (isTaskKilled(pTaskInfo)) { + qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + } + *pFinished = true; + return; + } + + uint32_t status = 0; + code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, code); + } + + if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { + *pFinished = true; + return; + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + *pSkipped = true; + return; + } + return; +} + static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -3247,7 +3293,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - SSDataBlock* pBlock = pInfo->pReaderBlock; + SSDataBlock* pBlock = NULL; int32_t code = 0; int64_t st = taosGetTimestampUs(); @@ -3255,53 +3301,56 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (true) { - if (!pInfo->bOnlyRetrieveBlock) { - code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); - if (code != 0) { - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); - } - - if (!hasNext || isTaskKilled(pTaskInfo)) { - pInfo->bNewFileset = false; - if (isTaskKilled(pTaskInfo)) { - qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + if (pInfo->rtnNextDurationBlocks) { + qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", + GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { + pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; + ++pInfo->nextDurationBlocksIdx; + } else { + for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) { + blockDataDestroy(pInfo->nextDurationBlocks[i]); + pInfo->nextDurationBlocks[i] = NULL; } + pInfo->rtnNextDurationBlocks = false; + pInfo->nextDurationBlocksIdx = 0; + pInfo->numNextDurationBlocks = 0; + continue; + } + } else { + + bool bFinished = false; + bool bSkipped = false; + doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); + pBlock = pInfo->pReaderBlock; + qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", + GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); + if (bFinished) { + pInfo->bNewFilesetEvent = false; break; } - if (pInfo->bNewFileset) { - pInfo->bOnlyRetrieveBlock = true; - return NULL; + if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) { + if (!bSkipped) { + pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); + ++pInfo->numNextDurationBlocks; + if (pInfo->numNextDurationBlocks > 2) { + qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); + pInfo->bNewFilesetEvent = false; + break; + } + } + if (pInfo->bNewFilesetEvent) { + pInfo->rtnNextDurationBlocks = true; + return NULL; + } + if (pInfo->bNextDurationBlockEvent) { + pInfo->bNextDurationBlockEvent = false; + continue; + } } + if (bSkipped) continue; } - // process this data block based on the probabilities - bool processThisBlock = processBlockWithProbability(&pInfo->sample); - if (!processThisBlock) { - continue; - } - - uint32_t status = 0; - code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); - if (pInfo->bOnlyRetrieveBlock) { - pInfo->bOnlyRetrieveBlock = false; - } - if (code != TSDB_CODE_SUCCESS) { - qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); - } - - if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { - break; - } - - // current block is filter out according to filter condition, continue load the next block - if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { - continue; - } - pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); if (pInfo->mergeLimit != -1) { @@ -3310,13 +3359,13 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - return pBlock; } return NULL; } + SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { int32_t tsTargetSlotId = 0; for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) { @@ -3348,7 +3397,13 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { STableMergeScanInfo* pTmsInfo = param; - pTmsInfo->bNewFileset = true; + if (type == TSD_READER_NOTIFY_DURATION_START) { + pTmsInfo->bNewFilesetEvent = true; + } else if (type == TSD_READER_NOTIFY_NEXT_DURATION_BLOCK) { + pTmsInfo->bNextDurationBlockEvent = true; + } + qDebug("table merge scan receive notification. type %d, fileset %d", type, info->duration.filesetId); + return; } @@ -3358,7 +3413,9 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; - pInfo->bNewFileset = false; + qDebug("%s table merge scan start duration ", GET_TASKID(pTaskInfo)); + pInfo->bNewFilesetEvent = false; + pInfo->bNextDurationBlockEvent = false; pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -3388,6 +3445,8 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + qDebug("%s table merge scan stop duration ", GET_TASKID(pTaskInfo)); SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; @@ -3405,6 +3464,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SReadHandle* pHandle = &pInfo->base.readHandle; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + qDebug("%s table merge scan start group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId); { size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); @@ -3452,10 +3512,19 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; } - + for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) { + if (pInfo->nextDurationBlocks[i]) { + blockDataDestroy(pInfo->nextDurationBlocks[i]); + pInfo->nextDurationBlocks[i] = NULL; + } + pInfo->numNextDurationBlocks = 0; + pInfo->nextDurationBlocksIdx = 0; + } resetLimitInfoForNextGroup(&pInfo->limitInfo); taosHashCleanup(pInfo->mSkipTables); pInfo->mSkipTables = NULL; + qDebug("%s table merge scan stop group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId); + return TSDB_CODE_SUCCESS; } @@ -3535,7 +3604,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { - if (pInfo->bNewFileset) { + if (pInfo->bNewFilesetEvent) { stopDurationForGroupTableMergeScan(pOperator); startDurationForGroupTableMergeScan(pOperator); } else { @@ -3566,6 +3635,13 @@ void destroyTableMergeScanOperatorInfo(void* param) { pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; + for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) { + if (pTableScanInfo->nextDurationBlocks[i] != NULL) { + blockDataDestroy(pTableScanInfo->nextDurationBlocks[i]); + pTableScanInfo->nextDurationBlocks[i] = NULL; + } + } + taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL;