fix(query): handle countAlwaysReturnValue behavior
when input data is empty
This commit is contained in:
parent
96df0cdd0d
commit
294af08f7a
|
@ -1595,6 +1595,62 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) {
|
||||||
|
if (!tsCountAlwaysReturnValue) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
|
bool hasCountFunc = false;
|
||||||
|
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
||||||
|
if ((strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "count") == 0) ||
|
||||||
|
(strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "hyperloglog") == 0)) {
|
||||||
|
hasCountFunc = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hasCountFunc) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = createDataBlock();
|
||||||
|
pBlock->info.rows = 0;
|
||||||
|
pBlock->info.capacity = 0;
|
||||||
|
pBlock->info.rowSize = 0;
|
||||||
|
pBlock->info.groupId = 0;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
||||||
|
SColumnInfoData pCol = {0};
|
||||||
|
pCol.hasNull = true;
|
||||||
|
pCol.info.type = TSDB_DATA_TYPE_NULL;
|
||||||
|
|
||||||
|
SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
|
||||||
|
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
|
||||||
|
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
|
||||||
|
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
||||||
|
int32_t slotId = pFuncParam->pCol->slotId;
|
||||||
|
taosArrayPush(pBlock->pDataBlock, &pCol);
|
||||||
|
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppBlock = pBlock;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) {
|
||||||
|
if (!blockAllocated) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataDestroy(*ppBlock);
|
||||||
|
*ppBlock = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// this is a blocking operator
|
// this is a blocking operator
|
||||||
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
if (OPTR_IS_OPENED(pOperator)) {
|
if (OPTR_IS_OPENED(pOperator)) {
|
||||||
|
@ -1612,14 +1668,27 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
int32_t scanFlag = MAIN_SCAN;
|
int32_t scanFlag = MAIN_SCAN;
|
||||||
|
|
||||||
|
bool hasValidBlock = false;
|
||||||
|
bool blockAllocated = false;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
if (!hasValidBlock) {
|
||||||
|
createDataBlockForEmptyInput(pOperator, &pBlock);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
blockAllocated = true;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hasValidBlock = true;
|
||||||
|
|
||||||
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
|
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1628,6 +1697,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
|
SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
|
||||||
code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
|
code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1637,8 +1707,12 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
setInputDataBlock(pSup, pBlock, order, scanFlag, true);
|
setInputDataBlock(pSup, pBlock, order, scanFlag, true);
|
||||||
code = doAggregateImpl(pOperator, pSup->pCtx);
|
code = doAggregateImpl(pOperator, pSup->pCtx);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
|
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
|
||||||
|
|
Loading…
Reference in New Issue