doGroupedTableScan
This commit is contained in:
parent
2c9fa56e9c
commit
3faf1514f4
|
@ -657,7 +657,7 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
|
|||
|
||||
|
||||
// record processed (non empty) table
|
||||
static int32_t insertTableToProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) {
|
||||
static int32_t markTableProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) {
|
||||
if (!pTableScanInfo->needCountEmptyTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -770,6 +770,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey
|
|||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
bool outputAll = pTableScanInfo->base.pTableListInfo->oneTableForEachGroup;
|
||||
|
||||
// The read handle is not initialized yet, since no qualified tables exists
|
||||
if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -780,7 +781,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey
|
|||
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||
if (p != NULL) {
|
||||
insertTableToProcessed(pTableScanInfo, p->info.id.uid);
|
||||
markTableProcessed(pTableScanInfo, p->info.id.uid);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -809,7 +810,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey
|
|||
while (pTableScanInfo->scanTimes < total) {
|
||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||
if (p != NULL) {
|
||||
insertTableToProcessed(pTableScanInfo, p->info.id.uid);
|
||||
markTableProcessed(pTableScanInfo, p->info.id.uid);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -827,30 +828,30 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey
|
|||
}
|
||||
|
||||
if (pTableScanInfo->needCountEmptyTable) {
|
||||
if (num == 0 && 0 == taosHashGetSize(pTableScanInfo->pValuedTables)) {
|
||||
// table by table, num is 0
|
||||
if (!pTableScanInfo->processingEmptyTable) {
|
||||
pTableScanInfo->processingEmptyTable = true;
|
||||
// current table is empty, fill result block info & return
|
||||
const STableKeyInfo* info = tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->currentTable);
|
||||
return getBlockForEmptyTable(pOperator, info);
|
||||
}
|
||||
|
||||
} else if (num > taosHashGetSize(pTableScanInfo->pValuedTables)) {
|
||||
// group by group, num >= 1
|
||||
// pList is NULL in mode TABLE_SCAN__TABLE_ORDER for streamscan, no need to process
|
||||
// pList not NULL, group by group, num >= 1
|
||||
int32_t tb_cnt = taosHashGetSize(pTableScanInfo->pValuedTables);
|
||||
if (pList && num > tb_cnt) {
|
||||
if (!pTableScanInfo->processingEmptyTable) {
|
||||
pTableScanInfo->processingEmptyTable = true;
|
||||
pTableScanInfo->currentTable = 0;
|
||||
}
|
||||
if (pTableScanInfo->currentTable < num) {
|
||||
// loop: get empty table uid & process
|
||||
while (pTableScanInfo->currentTable < num) {
|
||||
const STableKeyInfo* info = pList + pTableScanInfo->currentTable++;
|
||||
if (pTableScanInfo->pValuedTables &&
|
||||
NULL != taosHashGet(pTableScanInfo->pValuedTables, &info->uid, sizeof(info->uid))) {
|
||||
} else {
|
||||
return getBlockForEmptyTable(pOperator, info);
|
||||
if (outputAll) {
|
||||
// loop: get empty table uid & process
|
||||
while (pTableScanInfo->currentTable < num) {
|
||||
const STableKeyInfo* info = pList + pTableScanInfo->currentTable++;
|
||||
if (pTableScanInfo->pValuedTables &&
|
||||
NULL != taosHashGet(pTableScanInfo->pValuedTables, &info->uid, sizeof(info->uid))) {
|
||||
} else {
|
||||
return getBlockForEmptyTable(pOperator, info);
|
||||
}
|
||||
}
|
||||
} else if (tb_cnt == 0) {
|
||||
// only need one & all empty table in this group
|
||||
// output first one
|
||||
pTableScanInfo->currentTable = num;
|
||||
return getBlockForEmptyTable(pOperator, pList);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1005,7 +1006,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
STableScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
|
||||
if (pOperator->pOperatorGetParam) {
|
||||
pOperator->dynamicTask = true;
|
||||
int32_t code = createTableListInfoFromParam(pOperator);
|
||||
|
|
|
@ -758,7 +758,6 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
pAgg->isGroupTb = pAgg->pGroupKeys ? keysHasTbname(pAgg->pGroupKeys) : 0;
|
||||
pAgg->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0;
|
||||
pAgg->hasGroup = pAgg->pGroupKeys || pSelect->pPartitionByList;
|
||||
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pAgg;
|
||||
|
|
|
@ -2293,7 +2293,6 @@ static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogic
|
|||
} else {
|
||||
pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
|
||||
}
|
||||
|
||||
code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
|
||||
code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
|
||||
|
|
Loading…
Reference in New Issue