diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 291cc3b67b..826220581a 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1165,12 +1165,12 @@ _end: return code; } -static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { - pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols; - for (int i = 0; i < pFillSup->numOfAllCols; i++) { - SFillColInfo* pCol = &pFillSup->pAllColInfo[i]; - SResSchema* pSchema = &pCol->pExpr->base.resSchema; - pFillSup->rowSize += pSchema->bytes; +static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) { + int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock); + pFillSup->rowSize = sizeof(SResultCellData) * numOfCols; + for (int i = 0; i < numOfCols; i++) { + SColumnInfoData* pCol = taosArrayGet(pInputRes->pDataBlock, i); + pFillSup->rowSize += pCol->info.bytes; } pFillSup->next.key = INT64_MIN; pFillSup->nextNext.key = INT64_MIN; @@ -1185,7 +1185,7 @@ static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { } static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval, - SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI) { + SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI, SSDataBlock* pInputRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); @@ -1214,7 +1214,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod pFillSup->interval = *pInterval; pFillSup->pAPI = pAPI; - code = initResultBuf(pFillSup); + code = initResultBuf(pInputRes, pFillSup); QUERY_CHECK_CODE(code, lino, _end); SExprInfo* noFillExpr = NULL; @@ -1371,7 +1371,11 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); - pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); + pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno); + + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI, + pInfo->pSrcBlock); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _error); @@ -1380,8 +1384,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); - pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); - QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno); + code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error);