return error code of udf execution failure

This commit is contained in:
Ganlin Zhao 2023-07-06 15:05:49 +08:00
parent cde9eac954
commit 19d4c79ac6
1 changed files with 13 additions and 6 deletions

View File

@ -38,7 +38,7 @@ typedef struct SIndefOperatorInfo {
SSDataBlock* pNextGroupRes; SSDataBlock* pNextGroupRes;
} SIndefOperatorInfo; } SIndefOperatorInfo;
static SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator); static int32_t doGenerateSourceData(SOperatorInfo* pOperator);
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator); static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator);
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator); static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator);
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols); static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols);
@ -200,7 +200,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
if (newGroup) { if (newGroup) {
resetLimitInfoForNextGroup(pLimitInfo); resetLimitInfoForNextGroup(pLimitInfo);
} }
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
} }
@ -252,7 +252,12 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo; SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo;
if (downstream == NULL) { if (downstream == NULL) {
return doGenerateSourceData(pOperator); code = doGenerateSourceData(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
return (pRes->info.rows > 0) ? pRes : NULL;
} }
while (1) { while (1) {
@ -601,7 +606,7 @@ SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
return pList; return pList;
} }
SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
SProjectOperatorInfo* pProjectInfo = pOperator->info; SProjectOperatorInfo* pProjectInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
@ -658,7 +663,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pBlockList); taosArrayDestroy(pBlockList);
return NULL; return code;
} }
int32_t startOffset = pRes->info.rows; int32_t startOffset = pRes->info.rows;
@ -668,6 +673,8 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
taosArrayDestroy(pBlockList); taosArrayDestroy(pBlockList);
} }
} else {
return TSDB_CODE_OPS_NOT_SUPPORT;
} }
} }
@ -683,7 +690,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
return (pRes->info.rows > 0) ? pRes : NULL; return TSDB_CODE_SUCCESS;
} }
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {