Merge pull request #28206 from taosdata/fix/TD-32397-main
calculate stream fill row buff size
This commit is contained in:
commit
1f3e65e3a9
|
@ -1165,12 +1165,12 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
|
||||
pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols;
|
||||
for (int i = 0; i < pFillSup->numOfAllCols; i++) {
|
||||
SFillColInfo* pCol = &pFillSup->pAllColInfo[i];
|
||||
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
|
||||
pFillSup->rowSize += pSchema->bytes;
|
||||
static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) {
|
||||
int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock);
|
||||
pFillSup->rowSize = sizeof(SResultCellData) * numOfCols;
|
||||
for (int i = 0; i < numOfCols; i++) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pInputRes->pDataBlock, i);
|
||||
pFillSup->rowSize += pCol->info.bytes;
|
||||
}
|
||||
pFillSup->next.key = INT64_MIN;
|
||||
pFillSup->nextNext.key = INT64_MIN;
|
||||
|
@ -1185,7 +1185,7 @@ static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
|
|||
}
|
||||
|
||||
static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval,
|
||||
SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI) {
|
||||
SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI, SSDataBlock* pInputRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
|
||||
|
@ -1214,7 +1214,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
|
|||
pFillSup->interval = *pInterval;
|
||||
pFillSup->pAPI = pAPI;
|
||||
|
||||
code = initResultBuf(pFillSup);
|
||||
code = initResultBuf(pInputRes, pFillSup);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SExprInfo* noFillExpr = NULL;
|
||||
|
@ -1371,7 +1371,11 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI);
|
||||
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
|
||||
|
||||
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
|
||||
pInfo->pSrcBlock);
|
||||
if (!pInfo->pFillSup) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -1380,8 +1384,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
|
||||
|
||||
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
Loading…
Reference in New Issue