From 424ab1bbe32672570d4b8549b9b8f375e55bcbb5 Mon Sep 17 00:00:00 2001 From: Bob Liu Date: Thu, 28 Dec 2023 16:18:38 +0800 Subject: [PATCH] support table merge scan --- source/libs/executor/inc/executorInt.h | 3 ++- source/libs/executor/src/scanoperator.c | 27 ++++++++++++++------ tests/system-test/2-query/group_partition.py | 17 +++++++++--- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index b87dee475d..ae38d4940c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -307,7 +307,8 @@ typedef struct STableMergeScanInfo { SHashObj* mSkipTables; int64_t mergeLimit; SSortExecInfo sortExecInfo; - + bool needCountEmptyTable; + bool bGroupProcessed; // the group return data means processed bool filesetDelimited; bool bNewFilesetEvent; bool bNextDurationBlockEvent; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index dd6dff0301..a1f9baa082 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -700,17 +700,14 @@ static void markTableProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) { taosHashRemove(pTableScanInfo->pRemainTables, &uid, sizeof(uid)); } -static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STableKeyInfo* tbInfo) { - STableScanInfo* pTableScanInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSDataBlock* pBlock = pTableScanInfo->pResBlock; - +static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBase* pBase, SSDataBlock* pBlock, + const STableKeyInfo* tbInfo) { blockDataEmpty(pBlock); pBlock->info.rows = 1; pBlock->info.id.uid = tbInfo->uid; pBlock->info.id.groupId = tbInfo->groupId; - // only one row: set all col data to null & hasNull + // only one row: set all col data to null & hasNull int32_t col_num = blockDataGetNumOfCols(pBlock); for (int32_t i = 0; i < col_num; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); @@ -718,7 +715,14 @@ static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STable } // set tag/tbname - doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); + doSetTagColumnData(pBase, pBlock, pTaskInfo, pBlock->info.rows); + return pBlock; +} + +static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STableKeyInfo* tbInfo) { + STableScanInfo* pTableScanInfo = pOperator->info; + SSDataBlock* pBlock = + getOneRowResultBlock(pOperator->pTaskInfo, &pTableScanInfo->base, pTableScanInfo->pResBlock, tbInfo); pOperator->resultInfo.totalRows++; return pBlock; @@ -3585,7 +3589,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { } pInfo->tableEndIndex = i - 1; } - + pInfo->bGroupProcessed = false; int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; @@ -3707,9 +3711,14 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); + if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) { + STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo); + } if (pBlock != NULL) { pBlock->info.id.groupId = pInfo->groupId; pOperator->resultInfo.totalRows += pBlock->info.rows; + pInfo->bGroupProcessed = true; return pBlock; } else { if (pInfo->bNewFilesetEvent) { @@ -3864,6 +3873,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN } else { pInfo->filesetDelimited = pTableScanNode->filesetDelimited; } + pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable; + setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; diff --git a/tests/system-test/2-query/group_partition.py b/tests/system-test/2-query/group_partition.py index 8b06f0d6fd..e228351f0e 100644 --- a/tests/system-test/2-query/group_partition.py +++ b/tests/system-test/2-query/group_partition.py @@ -120,12 +120,21 @@ class TDTestCase: tdSql.checkRows(all_tb_num) # elapsed: continuous duration in a statistical period, table merge scan - tdSql.query(f" select count(c1), max(c5), avg(c5), elapsed(ts), spread(c1) from {self.dbname}.{self.stable} group by tbname") - tdSql.checkRows(nonempty_tb_num) - - tdSql.query(f" select count(c1), max(c1), avg(c1), elapsed(ts), spread(c1) from {self.dbname}.{self.stable} partition by tbname") + tdSql.query(f" select count(c1), max(c5), last_row(c5), elapsed(ts), spread(c1) from {self.dbname}.{self.stable} group by tbname") + tdSql.checkRows(all_tb_num) + + tdSql.query(f" select count(c1), min(c1), avg(c1), elapsed(ts), mode(c1) from {self.dbname}.{self.stable} partition by tbname") + tdSql.checkRows(all_tb_num) + + tdSql.query(f" select count(c1), elapsed(ts), twa(c1), irate(c1), leastsquares(c1,1,1) from {self.dbname}.{self.stable} partition by tbname") + tdSql.checkRows(all_tb_num) + + tdSql.query(f" select avg(c1), elapsed(ts), twa(c1), irate(c1) from {self.dbname}.{self.stable} partition by tbname") tdSql.checkRows(nonempty_tb_num) + # if nonempty_tb_num > 0: + # tdSql.query(f" select avg(c1), percentile(c1, 50) from {self.dbname}.sub_{self.stable}_1") + # tdSql.checkRows(1) def test_innerSelect(self, check_num): tdSql.query(f"select * from (select count(c1) from {self.dbname}.{self.stable} group by tbname) ")