diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index fdb72a9d5f..fd56b5f5ae 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1485,6 +1485,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SSDataBlock* pBlock = createDataBlock(); pBlock->info = pDataBlock->info; + pBlock->info.rows = 0; pBlock->info.capacity = 0; pBlock->info.rowSize = 0; @@ -1512,6 +1513,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); p->nData = pDataBlock->info.pks[1].nData; memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); + uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData); + uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData); + } else { + uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock); } if (copyData) { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c2032554b6..8e6f637d81 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -340,22 +340,21 @@ typedef struct STableMergeScanInfo { int32_t scanTimes; int32_t readIdx; SSDataBlock* pResBlock; - SSampleExecInfo sample; // sample execution info - SSHashObj* mTableNumRows; // uid->num of table rows - SHashObj* mSkipTables; - int64_t mergeLimit; + SSampleExecInfo sample; // sample execution info + SSHashObj* mTableNumRows; // uid->num of table rows + SHashObj* mSkipTables; + int64_t mergeLimit; SSortExecInfo sortExecInfo; - bool needCountEmptyTable; - bool bGroupProcessed; // the group return data means processed - bool filesetDelimited; - bool bNewFilesetEvent; - bool bNextDurationBlockEvent; - int32_t numNextDurationBlocks; - SSDataBlock* nextDurationBlocks[2]; - bool rtnNextDurationBlocks; - int32_t nextDurationBlocksIdx; - - bool bSortRowId; + bool needCountEmptyTable; + bool bGroupProcessed; // the group return data means processed + bool filesetDelimited; + bool bNewFilesetEvent; + bool bNextDurationBlockEvent; + int32_t numNextDurationBlocks; + SSDataBlock* nextDurationBlocks[2]; + bool rtnNextDurationBlocks; + int32_t nextDurationBlocksIdx; + bool bSortRowId; STmsSubTablesMergeInfo* pSubTablesMergeInfo; } STableMergeScanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7274811812..1a47895c05 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4069,14 +4069,13 @@ static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) { } static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - - SSDataBlock* pBlock = pInfo->pReaderBlock; - int32_t code = 0; - bool hasNext = false; - STsdbReader* reader = pInfo->base.dataReader; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSDataBlock* pBlock = pInfo->pReaderBlock; + int32_t code = 0; + bool hasNext = false; + STsdbReader* reader = pInfo->base.dataReader; code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { @@ -4112,27 +4111,23 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe *pSkipped = true; return; } + return; } static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; - SOperatorInfo* pOperator = source->pOperator; - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - SSDataBlock* pBlock = NULL; - int32_t code = 0; + SOperatorInfo* pOperator = source->pOperator; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSDataBlock* pBlock = NULL; + int64_t st = taosGetTimestampUs(); - int64_t st = taosGetTimestampUs(); - bool hasNext = false; - - STsdbReader* reader = pInfo->base.dataReader; while (true) { if (pInfo->rtnNextDurationBlocks) { - qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", - GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", + GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; @@ -4149,13 +4144,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { continue; } } else { - bool bFinished = false; bool bSkipped = false; doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); pBlock = pInfo->pReaderBlock; - qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", - GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); + qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", + GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); if (bFinished) { pInfo->bNewFilesetEvent = false; break; @@ -4166,15 +4160,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); ++pInfo->numNextDurationBlocks; if (pInfo->numNextDurationBlocks > 2) { - qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); + qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), + pInfo->numNextDurationBlocks); pInfo->bNewFilesetEvent = false; break; } } + if (pInfo->bNewFilesetEvent) { pInfo->rtnNextDurationBlocks = true; return NULL; } + if (pInfo->bNextDurationBlockEvent) { pInfo->bNextDurationBlockEvent = false; continue; @@ -4182,19 +4179,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { } if (bSkipped) continue; } + pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; - pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - + return pBlock; } return NULL; } - SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) { SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SBlockOrderInfo biTs = {0}; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 3dbf29e3a8..cd1a858175 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1688,12 +1688,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); - int64_t firstRowTs = *(int64_t*)tsCol->pData; - if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || + int64_t firstRowTs = *(int64_t*)tsCol->pData; + if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { if (bExtractedBlock) { blockDataDestroy(pBlk); - } + } continue; } }