fix: unexpected udf function

This commit is contained in:
factosea 2024-06-13 16:57:49 +08:00
parent 9c42c90209
commit d9043d6984
8 changed files with 22 additions and 19 deletions

View File

@ -861,7 +861,7 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
void setVgIdColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int32_t vgId);
void setVgVerColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int64_t vgVer);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,

View File

@ -406,7 +406,10 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
}
}
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
int32_t ret = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
// a new buffer page for each table. Needs to opt this design

View File

@ -221,8 +221,7 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi
(*pResult)->win = *win;
setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
}
static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex,

View File

@ -468,7 +468,7 @@ STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) {
return win;
}
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
bool init = false;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
@ -487,7 +487,11 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
if (!pResInfo->initialized) {
if (pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
bool ini = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
if (!ini && fmIsUserDefinedFunc(pCtx[i].functionId)){
pResInfo->initialized = false;
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
}
} else {
pResInfo->initialized = true;
}
@ -495,6 +499,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
init = true;
}
}
return TSDB_CODE_SUCCESS;
}
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {

View File

@ -319,7 +319,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, ret);
}
int32_t rowIndex = j - num;
@ -337,7 +337,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, ret);
}
int32_t rowIndex = pBlock->info.rows - num;
@ -1009,8 +1009,7 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
SResultRow* pResultRow =
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup, false);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
}
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {

View File

@ -548,8 +548,8 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu
// set time window for current result
res->win = (*win);
setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
return code;
if(code != TSDB_CODE_SUCCESS) return code;
return setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
}
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,
@ -1975,8 +1975,7 @@ static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pR
*pResult = (SResultRow*)pWinInfo->pStatePos->pRowBuff;
// set time window for current result
(*pResult)->win = pWinInfo->sessionWin.win;
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
}
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,

View File

@ -84,9 +84,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
pResultRow->win = (*win);
*pResult = pResultRow;
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
}
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
@ -1647,8 +1645,7 @@ static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWind
// set time window for current result
(*pResult)->win = (*win);
setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
}
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,

View File

@ -10,6 +10,7 @@ DLL_EXPORT int32_t udf2_init() { return 0; }
DLL_EXPORT int32_t udf2_destroy() { return 0; }
DLL_EXPORT int32_t udf2_start(SUdfInterBuf* buf) {
if(buf->buf == NULL || buf->bufLen < (sizeof(int64_t))) return TSDB_CODE_UDF_INVALID_BUFSIZE;
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 1;