diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 592231f043..fba8193ee5 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -624,6 +624,8 @@ typedef struct SDataGroupInfo { uint64_t groupId; int64_t numOfRows; SArray* pPageList; + SArray* blockForNotLoaded; // SSDataBlock that data is not loaded + int32_t offsetForNotLoaded; // read offset for SSDataBlock that data is not loaded } SDataGroupInfo; typedef struct SWindowRowsSup { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c44ba4eecb..9473f650ae 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -53,8 +53,6 @@ typedef struct SPartitionOperatorInfo { int32_t rowCapacity; // maximum number of rows for each buffer page int32_t* columnOffset; // start position for each column data SArray* sortedGroupArray; // SDataGroupInfo sorted by group id - SArray* blockForNotLoaded; // SSDataBlock that data is not loaded - int32_t offsetForNotLoaded;// read offset for SSDataBlock that data is not loaded int32_t groupIndex; // group index int32_t pageIndex; // page index of current group SExprSupp scalarSup; @@ -573,21 +571,14 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc SSDataBlock* pDstBlock = createDataBlock(); pDstBlock->info = pDataBlock->info; - pDstBlock->info.rows = 0; + copyPkVal(&pDstBlock->info, &pDataBlock->info); + pDstBlock->info.rows = pDataBlock->info.rows; pDstBlock->info.capacity = 0; pDstBlock->info.rowSize = 0; pDstBlock->info.id = pDataBlock->info.id; pDstBlock->info.blankFill = pDataBlock->info.blankFill; size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); - - pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); - if (pDstBlock->pBlockAgg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - copyPkVal(&pDstBlock->info, &pDataBlock->info); - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; int32_t slotId = pExpr->base.pParam[0].pCol->slotId; @@ -605,24 +596,25 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc return NULL; } - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; - int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - if (slotId < numOfCols) { - pDstBlock->pBlockAgg[slotId] = pDataBlock->pBlockAgg[i]; - pDstBlock->pBlockAgg[slotId].colId = i; + if (pDataBlock->pBlockAgg != NULL) { + pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); + if (pDstBlock->pBlockAgg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + blockDataDestroy(pDstBlock); + return NULL; + } + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + if (slotId < numOfCols) { + pDstBlock->pBlockAgg[slotId] = pDataBlock->pBlockAgg[i]; + SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); + SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, slotId); + colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + } } - - // SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); - // if (pSrc) { - // SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); - // colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); - // } } - pDstBlock->info.rows = pDataBlock->info.rows; - pDstBlock->info.capacity = pDataBlock->info.rows; - return pDstBlock; } @@ -718,13 +710,14 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (dataNotLoadBlock == NULL) { T_LONG_JMP(pTaskInfo->env, terrno); } - if (pInfo->blockForNotLoaded == NULL) { - pInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock)); - pInfo->offsetForNotLoaded = 0; + if (pGroupInfo->blockForNotLoaded == NULL) { + pGroupInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock)); + pGroupInfo->offsetForNotLoaded = 0; } dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; dataNotLoadBlock->info.dataLoad = 0; - taosArrayInsert(pInfo->blockForNotLoaded, pInfo->blockForNotLoaded->size, &dataNotLoadBlock); + pInfo->binfo.pRes->info.rows = pBlock->info.rows; + taosArrayInsert(pGroupInfo->blockForNotLoaded, pGroupInfo->blockForNotLoaded->size, &dataNotLoadBlock); break; } } @@ -808,18 +801,18 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { int32_t size = taosArrayGetSize(pInfo->sortedGroupArray); for (int32_t i = 0; i < size; i++) { SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i); + if (pGp->blockForNotLoaded) { + for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) { + SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i); + blockDataDestroy(*pBlock); + } + taosArrayClear(pGp->blockForNotLoaded); + pGp->offsetForNotLoaded = 0; + } taosArrayDestroy(pGp->pPageList); } taosArrayClear(pInfo->sortedGroupArray); clearDiskbasedBuf(pInfo->pBuf); - if (pInfo->blockForNotLoaded) { - for (int32_t i = 0; i < pInfo->blockForNotLoaded->size; i++) { - SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, i); - blockDataDestroy(*pBlock); - } - taosArrayClear(pInfo->blockForNotLoaded); - pInfo->offsetForNotLoaded = 0; - } } static int compareDataGroupInfo(const void* group1, const void* group2) { @@ -833,19 +826,13 @@ static int compareDataGroupInfo(const void* group1, const void* group2) { return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1; } -static SSDataBlock* buildPartitionResultForNotLoadBlock(SOperatorInfo* pOperator) { - SPartitionOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - if (pInfo->blockForNotLoaded && pInfo->offsetForNotLoaded < pInfo->blockForNotLoaded->size) { - SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, pInfo->offsetForNotLoaded); - pInfo->offsetForNotLoaded++; +static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) { + if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) { + SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded); + pGroupInfo->offsetForNotLoaded++; return *pBlock; - } else { - setOperatorCompleted(pOperator); - clearPartitionOperator(pInfo); - return NULL; } + return NULL; } static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { @@ -857,9 +844,15 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SDataGroupInfo* pGroupInfo = (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { + if(pGroupInfo != NULL) { + SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo); + if(ret != NULL) return ret; + } // try next group data if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) { - return buildPartitionResultForNotLoadBlock(pOperator); + setOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); + return NULL; } ++pInfo->groupIndex; @@ -873,6 +866,22 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, terrno); } + if (*(int32_t*)page == 0) { + SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo); + if (ret != NULL) return ret; + releaseBufPage(pInfo->pBuf, page); + if (pInfo->pageIndex < taosArrayGetSize(pGroupInfo->pPageList)) { + pInfo->pageIndex += 1; + } else if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) { + pInfo->groupIndex++; + pInfo->pageIndex = 0; + } else { + setOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); + return NULL; + } + return buildPartitionResult(pOperator); + } blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity); blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); @@ -882,6 +891,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; pInfo->binfo.pRes->info.dataLoad = 1; pInfo->orderedRows = 0; + } else if (pInfo->pOrderInfoArr == NULL) { + qError("Exception, remainRows not zero, but pOrderInfoArr is NULL"); } if (pInfo->pOrderInfoArr) { @@ -944,6 +955,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { while (pGroupIter != NULL) { SDataGroupInfo* pGroupInfo = pGroupIter; taosArrayPush(groupArray, pGroupInfo); + static int i = 0; + qInfo("groupArray push %p %p %d times", pGroupInfo, pGroupInfo->blockForNotLoaded, ++i); pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3fb298e1ea..36de40a38a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3628,7 +3628,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex); ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); key.groupId = pSrcBlock->info.id.groupId; - key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);; + key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex); } char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);