fix: table group error in stream

This commit is contained in:
wangmm0220 2022-06-24 11:24:31 +08:00
parent a95f98f7e7
commit 4f305c74d1
3 changed files with 8 additions and 5 deletions

View File

@ -4055,6 +4055,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
pTableListInfo->needSortTableByGroupId = true;
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
if(code){ if(code){
return NULL; return NULL;

View File

@ -510,10 +510,11 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if(pInfo->currentGroupId == -1){ if(pInfo->currentGroupId == -1){
pInfo->currentGroupId++; pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
doSetOperatorCompleted(pOperator); setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL; return NULL;
} }
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbCleanupReadHandle(pInfo->dataReader);
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId); tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
pInfo->dataReader = pReader; pInfo->dataReader = pReader;
} }
@ -525,7 +526,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pInfo->currentGroupId++; pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
doSetOperatorCompleted(pOperator); setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL; return NULL;
} }
@ -541,7 +542,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result; return result;
} }
doSetOperatorCompleted(pOperator); setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL; return NULL;
} }
@ -821,8 +822,9 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0; pTableScanInfo->curTWinIdx = 0;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); // tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
return true; return true;
} }

View File

@ -1189,7 +1189,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}, {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaOptimize}, {.pName = "SmaIndex", .optimizeFunc = smaOptimize},
// {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize} {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}
}; };
// clang-format on // clang-format on