support table merge scan

This commit is contained in:
Bob Liu 2023-12-28 16:18:38 +08:00
parent a5f88a86e9
commit 424ab1bbe3
3 changed files with 34 additions and 13 deletions

View File

@ -307,7 +307,8 @@ typedef struct STableMergeScanInfo {
SHashObj* mSkipTables; SHashObj* mSkipTables;
int64_t mergeLimit; int64_t mergeLimit;
SSortExecInfo sortExecInfo; SSortExecInfo sortExecInfo;
bool needCountEmptyTable;
bool bGroupProcessed; // the group return data means processed
bool filesetDelimited; bool filesetDelimited;
bool bNewFilesetEvent; bool bNewFilesetEvent;
bool bNextDurationBlockEvent; bool bNextDurationBlockEvent;

View File

@ -700,17 +700,14 @@ static void markTableProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) {
taosHashRemove(pTableScanInfo->pRemainTables, &uid, sizeof(uid)); taosHashRemove(pTableScanInfo->pRemainTables, &uid, sizeof(uid));
} }
static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STableKeyInfo* tbInfo) { static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBase* pBase, SSDataBlock* pBlock,
STableScanInfo* pTableScanInfo = pOperator->info; const STableKeyInfo* tbInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
blockDataEmpty(pBlock); blockDataEmpty(pBlock);
pBlock->info.rows = 1; pBlock->info.rows = 1;
pBlock->info.id.uid = tbInfo->uid; pBlock->info.id.uid = tbInfo->uid;
pBlock->info.id.groupId = tbInfo->groupId; pBlock->info.id.groupId = tbInfo->groupId;
// only one row: set all col data to null & hasNull // only one row: set all col data to null & hasNull
int32_t col_num = blockDataGetNumOfCols(pBlock); int32_t col_num = blockDataGetNumOfCols(pBlock);
for (int32_t i = 0; i < col_num; ++i) { for (int32_t i = 0; i < col_num; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
@ -718,7 +715,14 @@ static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STable
} }
// set tag/tbname // set tag/tbname
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); doSetTagColumnData(pBase, pBlock, pTaskInfo, pBlock->info.rows);
return pBlock;
}
static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STableKeyInfo* tbInfo) {
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock =
getOneRowResultBlock(pOperator->pTaskInfo, &pTableScanInfo->base, pTableScanInfo->pResBlock, tbInfo);
pOperator->resultInfo.totalRows++; pOperator->resultInfo.totalRows++;
return pBlock; return pBlock;
@ -3585,7 +3589,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
} }
pInfo->tableEndIndex = i - 1; pInfo->tableEndIndex = i - 1;
} }
pInfo->bGroupProcessed = false;
int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex; int32_t tableEndIdx = pInfo->tableEndIndex;
@ -3707,9 +3711,14 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
pOperator); pOperator);
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
}
if (pBlock != NULL) { if (pBlock != NULL) {
pBlock->info.id.groupId = pInfo->groupId; pBlock->info.id.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->bGroupProcessed = true;
return pBlock; return pBlock;
} else { } else {
if (pInfo->bNewFilesetEvent) { if (pInfo->bNewFilesetEvent) {
@ -3864,6 +3873,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
} else { } else {
pInfo->filesetDelimited = pTableScanNode->filesetDelimited; pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
} }
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;

View File

@ -120,12 +120,21 @@ class TDTestCase:
tdSql.checkRows(all_tb_num) tdSql.checkRows(all_tb_num)
# elapsed: continuous duration in a statistical period, table merge scan # elapsed: continuous duration in a statistical period, table merge scan
tdSql.query(f" select count(c1), max(c5), avg(c5), elapsed(ts), spread(c1) from {self.dbname}.{self.stable} group by tbname") tdSql.query(f" select count(c1), max(c5), last_row(c5), elapsed(ts), spread(c1) from {self.dbname}.{self.stable} group by tbname")
tdSql.checkRows(nonempty_tb_num) tdSql.checkRows(all_tb_num)
tdSql.query(f" select count(c1), max(c1), avg(c1), elapsed(ts), spread(c1) from {self.dbname}.{self.stable} partition by tbname") tdSql.query(f" select count(c1), min(c1), avg(c1), elapsed(ts), mode(c1) from {self.dbname}.{self.stable} partition by tbname")
tdSql.checkRows(all_tb_num)
tdSql.query(f" select count(c1), elapsed(ts), twa(c1), irate(c1), leastsquares(c1,1,1) from {self.dbname}.{self.stable} partition by tbname")
tdSql.checkRows(all_tb_num)
tdSql.query(f" select avg(c1), elapsed(ts), twa(c1), irate(c1) from {self.dbname}.{self.stable} partition by tbname")
tdSql.checkRows(nonempty_tb_num) tdSql.checkRows(nonempty_tb_num)
# if nonempty_tb_num > 0:
# tdSql.query(f" select avg(c1), percentile(c1, 50) from {self.dbname}.sub_{self.stable}_1")
# tdSql.checkRows(1)
def test_innerSelect(self, check_num): def test_innerSelect(self, check_num):
tdSql.query(f"select * from (select count(c1) from {self.dbname}.{self.stable} group by tbname) ") tdSql.query(f"select * from (select count(c1) from {self.dbname}.{self.stable} group by tbname) ")