calculate stream fill row buff size

This commit is contained in:
54liuyao 2024-09-30 11:36:58 +08:00
parent a0d7b0cde4
commit c0340388c5
1 changed files with 14 additions and 11 deletions

View File

@ -1165,12 +1165,12 @@ _end:
return code; return code;
} }
static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) {
pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols; int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock);
for (int i = 0; i < pFillSup->numOfAllCols; i++) { pFillSup->rowSize = sizeof(SResultCellData) * numOfCols;
SFillColInfo* pCol = &pFillSup->pAllColInfo[i]; for (int i = 0; i < numOfCols; i++) {
SResSchema* pSchema = &pCol->pExpr->base.resSchema; SColumnInfoData* pCol = taosArrayGet(pInputRes->pDataBlock, i);
pFillSup->rowSize += pSchema->bytes; pFillSup->rowSize += pCol->info.bytes;
} }
pFillSup->next.key = INT64_MIN; pFillSup->next.key = INT64_MIN;
pFillSup->nextNext.key = INT64_MIN; pFillSup->nextNext.key = INT64_MIN;
@ -1185,7 +1185,7 @@ static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
} }
static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval, static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval,
SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI) { SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI, SSDataBlock* pInputRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
@ -1214,7 +1214,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
pFillSup->interval = *pInterval; pFillSup->interval = *pInterval;
pFillSup->pAPI = pAPI; pFillSup->pAPI = pAPI;
code = initResultBuf(pFillSup); code = initResultBuf(pInputRes, pFillSup);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
SExprInfo* noFillExpr = NULL; SExprInfo* noFillExpr = NULL;
@ -1371,7 +1371,11 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
pInfo->pSrcBlock);
if (!pInfo->pFillSup) { if (!pInfo->pFillSup) {
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
@ -1380,8 +1384,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);