enh(query): add error check for scalar function calculation.

This commit is contained in:
Haojun Liao 2022-05-09 19:16:22 +08:00
parent 55d11618d0
commit 09f07dbbcf
3 changed files with 16 additions and 3 deletions

View File

@ -3979,6 +3979,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes); blockDataCleanup(pRes);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
@ -4042,9 +4043,13 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
pProjectInfo->pPseudoColInfo); pProjectInfo->pPseudoColInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
int32_t status = handleLimitOffset(pOperator, pBlock); int32_t status = handleLimitOffset(pOperator, pBlock);
if (status == PROJECT_RETRIEVE_CONTINUE) { if (status == PROJECT_RETRIEVE_CONTINUE) {
continue; continue;

View File

@ -262,6 +262,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo* pInfo = pOperator->info; SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes; SSDataBlock* pRes = pInfo->binfo.pRes;
@ -289,7 +291,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation. // there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->pScalarExprInfo != NULL) { if (pInfo->pScalarExprInfo != NULL) {
projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL); pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
} }
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);

View File

@ -114,7 +114,10 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo* pOperator = param; SOperatorInfo* pOperator = param;
SSortOperatorInfo* pSort = pOperator->info; SSortOperatorInfo* pSort = pOperator->info;
if (pOperator->pExpr != NULL) { if (pOperator->pExpr != NULL) {
projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL); int32_t code = projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
}
} }
} }