Merge pull request #26144 from taosdata/fix/TD-30039/unexpectedUdfCreate

Fix/td 30039/unexpected udf create
This commit is contained in:
dapan1121 2024-06-17 08:35:02 +08:00 committed by GitHub
commit 52060f5fba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 22 additions and 18 deletions

View File

@ -863,7 +863,7 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
void setVgIdColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int32_t vgId);
void setVgVerColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int64_t vgVer);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,

View File

@ -406,7 +406,10 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
}
}
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
int32_t ret = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
// a new buffer page for each table. Needs to opt this design

View File

@ -221,8 +221,7 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi
(*pResult)->win = *win;
setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
}
static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex,

View File

@ -468,7 +468,7 @@ STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) {
return win;
}
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
bool init = false;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
@ -487,7 +487,11 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
if (!pResInfo->initialized) {
if (pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
bool ini = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
if (!ini && fmIsUserDefinedFunc(pCtx[i].functionId)){
pResInfo->initialized = false;
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
}
} else {
pResInfo->initialized = true;
}
@ -495,6 +499,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
init = true;
}
}
return TSDB_CODE_SUCCESS;
}
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {

View File

@ -319,7 +319,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, ret);
}
int32_t rowIndex = j - num;
@ -337,7 +337,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, ret);
}
int32_t rowIndex = pBlock->info.rows - num;
@ -1110,8 +1110,7 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
SResultRow* pResultRow =
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup, false);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
}
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {

View File

@ -1975,8 +1975,7 @@ static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pR
*pResult = (SResultRow*)pWinInfo->pStatePos->pRowBuff;
// set time window for current result
(*pResult)->win = pWinInfo->sessionWin.win;
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
}
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,

View File

@ -84,9 +84,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
pResultRow->win = (*win);
*pResult = pResultRow;
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
}
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
@ -1647,8 +1645,7 @@ static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWind
// set time window for current result
(*pResult)->win = (*win);
setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
}
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,

View File

@ -691,7 +691,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
convertDataBlockToUdfDataBlock(&call->block, &input);
code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
freeUdfDataDataBlock(&input);
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
if(code == 0) convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
freeUdfColumn(&output);
break;
}

View File

@ -10,6 +10,7 @@ DLL_EXPORT int32_t udf2_init() { return 0; }
DLL_EXPORT int32_t udf2_destroy() { return 0; }
DLL_EXPORT int32_t udf2_start(SUdfInterBuf* buf) {
if(buf->buf == NULL || buf->bufLen < (sizeof(int64_t))) return TSDB_CODE_UDF_INVALID_BUFSIZE;
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 1;
@ -17,6 +18,7 @@ DLL_EXPORT int32_t udf2_start(SUdfInterBuf* buf) {
}
DLL_EXPORT int32_t udf2(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) {
if(newInterBuf->buf == NULL || newInterBuf->bufLen < (sizeof(double))) return TSDB_CODE_UDF_INVALID_BUFSIZE;
double sumSquares = 0;
if (interBuf->numOfResult == 1) {
sumSquares = *(double*)interBuf->buf;