diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 63be7eaa67..e4c650a50b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -570,45 +570,39 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc SSDataBlock* pDstBlock = createDataBlock(); pDstBlock->info = pDataBlock->info; + pDstBlock->info.id = pOperator->resultDataBlockId; pDstBlock->info.capacity = 0; pDstBlock->info.rowSize = 0; - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; - int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); - SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; - blockDataAppendColInfo(pDstBlock, &colInfo); - } - - int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - blockDataDestroy(pDstBlock); - return NULL; - } - - if (pDataBlock->pBlockAgg != NULL) { + size_t numOfCols = pOperator->exprSupp.numOfExprs; + if (pDataBlock->pBlockAgg) { pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); if (pDstBlock->pBlockAgg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; blockDataDestroy(pDstBlock); return NULL; } - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; - int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId]; - } } for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; int32_t slotId = pExpr->base.pParam[0].pCol->slotId; SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); + SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; + blockDataAppendColInfo(pDstBlock, &colInfo); + SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); + int32_t code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + blockDataDestroy(pDstBlock); + return NULL; + } colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + + if (pDataBlock->pBlockAgg) { + pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId]; + } } return pDstBlock; @@ -707,13 +701,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { T_LONG_JMP(pTaskInfo->env, terrno); } if (pGroupInfo->blockForNotLoaded == NULL) { - pGroupInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock)); + pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*)); pGroupInfo->offsetForNotLoaded = 0; } dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; dataNotLoadBlock->info.dataLoad = 0; pInfo->binfo.pRes->info.rows = pBlock->info.rows; - taosArrayInsert(pGroupInfo->blockForNotLoaded, pGroupInfo->blockForNotLoaded->size, &dataNotLoadBlock); + taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock); break; } } @@ -863,12 +857,10 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, terrno); } if (*(int32_t*)page == 0) { + releaseBufPage(pInfo->pBuf, page); SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo); if (ret != NULL) return ret; - releaseBufPage(pInfo->pBuf, page); - if (pInfo->pageIndex < taosArrayGetSize(pGroupInfo->pPageList)) { - pInfo->pageIndex += 1; - } else if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) { + if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) { pInfo->groupIndex++; pInfo->pageIndex = 0; } else { @@ -951,8 +943,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { while (pGroupIter != NULL) { SDataGroupInfo* pGroupInfo = pGroupIter; taosArrayPush(groupArray, pGroupInfo); - static int i = 0; - qInfo("groupArray push %p %p %d times", pGroupInfo, pGroupInfo->blockForNotLoaded, ++i); pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter); }