From d9043d6984659ebe0d5e484a01ec0242a725d96d Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 13 Jun 2024 16:57:49 +0800 Subject: [PATCH 1/3] fix: unexpected udf function --- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/aggregateoperator.c | 5 ++++- source/libs/executor/src/eventwindowoperator.c | 3 +-- source/libs/executor/src/executorInt.c | 9 +++++++-- source/libs/executor/src/groupoperator.c | 7 +++---- source/libs/executor/src/streamtimewindowoperator.c | 7 +++---- source/libs/executor/src/timewindowoperator.c | 7 ++----- source/libs/function/test/udf2.c | 1 + 8 files changed, 22 insertions(+), 19 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 592231f043..6c2327d692 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -861,7 +861,7 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, void setVgIdColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int32_t vgId); void setVgVerColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int64_t vgVer); -void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); +int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index b5a49831c5..a7b4532bd0 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -406,7 +406,10 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin } } - setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); + int32_t ret = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } } // a new buffer page for each table. Needs to opt this design diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 29907e6f1f..d73274f85e 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -221,8 +221,7 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi (*pResult)->win = *win; - setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); - return TSDB_CODE_SUCCESS; + return setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); } static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex, diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 43c04ca8d9..92a777a5cc 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -468,7 +468,7 @@ STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) { return win; } -void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { +int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { bool init = false; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset); @@ -487,7 +487,11 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO if (!pResInfo->initialized) { if (pCtx[i].functionId != -1) { - pCtx[i].fpSet.init(&pCtx[i], pResInfo); + bool ini = pCtx[i].fpSet.init(&pCtx[i], pResInfo); + if (!ini && fmIsUserDefinedFunc(pCtx[i].functionId)){ + pResInfo->initialized = false; + return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; + } } else { pResInfo->initialized = true; } @@ -495,6 +499,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO init = true; } } + return TSDB_CODE_SUCCESS; } void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9a31e993b2..d36c8ddf9c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -319,7 +319,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + T_LONG_JMP(pTaskInfo->env, ret); } int32_t rowIndex = j - num; @@ -337,7 +337,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + T_LONG_JMP(pTaskInfo->env, ret); } int32_t rowIndex = pBlock->info.rows - num; @@ -1009,8 +1009,7 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup, false); - setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset); - return TSDB_CODE_SUCCESS; + return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset); } uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2224942893..013b0f2f02 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -548,8 +548,8 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu // set time window for current result res->win = (*win); - setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset); - return code; + if(code != TSDB_CODE_SUCCESS) return code; + return setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset); } bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, @@ -1975,8 +1975,7 @@ static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pR *pResult = (SResultRow*)pWinInfo->pStatePos->pRowBuff; // set time window for current result (*pResult)->win = pWinInfo->sessionWin.win; - setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset); - return TSDB_CODE_SUCCESS; + return setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset); } int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult, diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b72811cdcc..cdcdc6ddd1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -84,9 +84,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo pResultRow->win = (*win); *pResult = pResultRow; - setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); - - return TSDB_CODE_SUCCESS; + return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); } static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { @@ -1647,8 +1645,7 @@ static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWind // set time window for current result (*pResult)->win = (*win); - setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); - return TSDB_CODE_SUCCESS; + return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); } static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index faf4daa4e5..e0b08f227f 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -10,6 +10,7 @@ DLL_EXPORT int32_t udf2_init() { return 0; } DLL_EXPORT int32_t udf2_destroy() { return 0; } DLL_EXPORT int32_t udf2_start(SUdfInterBuf* buf) { + if(buf->buf == NULL || buf->bufLen < (sizeof(int64_t))) return TSDB_CODE_UDF_INVALID_BUFSIZE; *(int64_t*)(buf->buf) = 0; buf->bufLen = sizeof(double); buf->numOfResult = 1; From a707a2921b015e532f6909ef5b23ee39d860f20c Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 13 Jun 2024 17:49:58 +0800 Subject: [PATCH 2/3] fix: udfScalarProcFunc exception --- source/libs/function/src/udfd.c | 2 +- source/libs/function/test/udf2.c | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 54745951cc..df97e873aa 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -691,7 +691,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { convertDataBlockToUdfDataBlock(&call->block, &input); code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); freeUdfDataDataBlock(&input); - convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); + if(code == 0) convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); freeUdfColumn(&output); break; } diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index e0b08f227f..273b9c49c2 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -18,6 +18,7 @@ DLL_EXPORT int32_t udf2_start(SUdfInterBuf* buf) { } DLL_EXPORT int32_t udf2(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) { + if(newInterBuf->buf == NULL || newInterBuf->bufLen < (sizeof(double))) return TSDB_CODE_UDF_INVALID_BUFSIZE; double sumSquares = 0; if (interBuf->numOfResult == 1) { sumSquares = *(double*)interBuf->buf; From 2044cec6ca4e0f6fcb8fff25db5b996f6bec1d8e Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Thu, 13 Jun 2024 21:06:58 +0800 Subject: [PATCH 3/3] test --- source/libs/executor/src/streamtimewindowoperator.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 013b0f2f02..2d73cf3cf6 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -548,8 +548,8 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu // set time window for current result res->win = (*win); - if(code != TSDB_CODE_SUCCESS) return code; - return setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset); + setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset); + return code; } bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,