fix(query): set the group id.

This commit is contained in:
Haojun Liao 2022-09-13 18:22:58 +08:00
parent 60574763a9
commit ad45c0a848
2 changed files with 26 additions and 12 deletions

View File

@ -800,9 +800,15 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
taosMemoryFreeClear(pColInfoData); taosMemoryFreeClear(pColInfoData);
} }
for (int i = 0; i < taosArrayGetSize(res); i++) { size_t numOfTables = taosArrayGetSize(res);
for (int i = 0; i < numOfTables; i++) {
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0}; STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
taosArrayPush(pListInfo->pTableList, &info); void* p = taosArrayPush(pListInfo->pTableList, &info);
if (p == NULL) {
taosArrayDestroy(res);
return TSDB_CODE_OUT_OF_MEMORY;
}
qDebug("tagfilter get uid:%ld", info.uid); qDebug("tagfilter get uid:%ld", info.uid);
} }

View File

@ -4872,8 +4872,8 @@ void destroyMAIOperatorInfo(void* param) {
} }
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) { static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize); SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
return pResult; return pResult;
} }
@ -4998,15 +4998,23 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
if (pMiaInfo->groupId != pBlock->info.groupId && pMiaInfo->groupId != 0) { if (pMiaInfo->groupId == 0) {
// if there are unclosed time window, close it firstly. if (pMiaInfo->groupId != pBlock->info.groupId) {
ASSERT(pMiaInfo->curTs != INT64_MIN); pMiaInfo->groupId = pBlock->info.groupId;
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); }
} else {
if (pMiaInfo->groupId != pBlock->info.groupId) {
// if there are unclosed time window, close it firstly.
ASSERT(pMiaInfo->curTs != INT64_MIN);
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
pMiaInfo->prefetchedBlock = pBlock; pMiaInfo->prefetchedBlock = pBlock;
pMiaInfo->curTs = INT64_MIN; pMiaInfo->curTs = INT64_MIN;
pMiaInfo->groupId = 0; pMiaInfo->groupId = 0;
break; break;
} else {
// continue
}
} }
getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag);