diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4319dd379a..a743465755 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1595,6 +1595,62 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan } } +static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) { + if (!tsCountAlwaysReturnValue) { + return TSDB_CODE_SUCCESS; + } + + SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; + bool hasCountFunc = false; + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + if ((strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "count") == 0) || + (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "hyperloglog") == 0)) { + hasCountFunc = true; + break; + } + } + + if (!hasCountFunc) { + return TSDB_CODE_SUCCESS; + } + + SSDataBlock* pBlock = createDataBlock(); + pBlock->info.rows = 0; + pBlock->info.capacity = 0; + pBlock->info.rowSize = 0; + pBlock->info.groupId = 0; + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SColumnInfoData pCol = {0}; + pCol.hasNull = true; + pCol.info.type = TSDB_DATA_TYPE_NULL; + + SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i]; + for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { + SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; + if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { + int32_t slotId = pFuncParam->pCol->slotId; + taosArrayPush(pBlock->pDataBlock, &pCol); + } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { + } + } + } + + *ppBlock = pBlock; + + return TSDB_CODE_SUCCESS; +} + +static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) { + if (!blockAllocated) { + return; + } + + blockDataDestroy(*ppBlock); + *ppBlock = NULL; +} + + // this is a blocking operator static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { if (OPTR_IS_OPENED(pOperator)) { @@ -1612,14 +1668,27 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; + bool hasValidBlock = false; + bool blockAllocated = false; + while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - break; + if (!hasValidBlock) { + createDataBlockForEmptyInput(pOperator, &pBlock); + if (pBlock == NULL) { + break; + } + blockAllocated = true; + } else { + break; + } } + hasValidBlock = true; int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } @@ -1628,6 +1697,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SExprSupp* pSup1 = &pAggInfo->scalarExprSup; code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL); if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } } @@ -1637,8 +1707,12 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { setInputDataBlock(pSup, pBlock, order, scanFlag, true); code = doAggregateImpl(pOperator, pSup->pCtx); if (code != 0) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } + + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + } initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);