diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c33b6622e3..9e6613d3c2 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -506,7 +506,8 @@ typedef struct SPartitionOperatorInfo { SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file int32_t rowCapacity; // maximum number of rows for each buffer page int32_t* columnOffset; // start position for each column data - void* pGroupIter; // group iterator + SArray* sortedGroupArray; // SDataGroupInfo sorted by group id + int32_t groupIndex; // group index int32_t pageIndex; // page index of current group SSDataBlock* pUpdateRes; SExprSupp scalarSup; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 527f4520bf..482326a30e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -586,24 +586,30 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { while( (ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL ) { taosArrayDestroy( ((SDataGroupInfo *)ite)->pPageList); } - taosHashClear(pInfo->pGroupSet); + taosArrayClear(pInfo->sortedGroupArray); clearDiskbasedBuf(pInfo->pBuf); } +static int compareDataGroupInfo(const void* group1, const void* group2) { + const SDataGroupInfo* pGroupInfo1 = group1; + const SDataGroupInfo* pGroupInfo2 = group2; + return pGroupInfo1->groupId - pGroupInfo2->groupId; +} + static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SPartitionOperatorInfo* pInfo = pOperator->info; - SDataGroupInfo* pGroupInfo = pInfo->pGroupIter; - if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { + SDataGroupInfo* pGroupInfo = (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; + if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { // try next group data - pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter); - if (pInfo->pGroupIter == NULL) { + ++pInfo->groupIndex; + if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { doSetOperatorCompleted(pOperator); clearPartitionOperator(pInfo); return NULL; } - pGroupInfo = pInfo->pGroupIter; + pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex); pInfo->pageIndex = 0; } @@ -657,6 +663,20 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { doHashPartition(pOperator, pBlock); } + SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo)); + void* pGroupIter = NULL; + pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL); + while (pGroupIter != NULL) { + SDataGroupInfo* pGroupInfo = pGroupIter; + taosArrayPush(groupArray, pGroupInfo); + pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter); + } + + taosArraySort(groupArray, compareDataGroupInfo); + pInfo->sortedGroupArray = groupArray; + pInfo->groupIndex = -1; + taosHashClear(pInfo->pGroupSet); + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->status = OP_RES_TO_RETURN; @@ -676,6 +696,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pGroupColVals); taosMemoryFree(pInfo->keyBuf); + taosArrayDestroy(pInfo->sortedGroupArray); taosHashCleanup(pInfo->pGroupSet); taosMemoryFree(pInfo->columnOffset);