diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 354f4d8752..d4d8696aba 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -133,6 +133,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { if (ret != TSDB_CODE_SUCCESS) { publishQueryAbortEvent(pTaskInfo, ret); pTaskInfo->code = ret; + cleanUpUdfs(); qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); return pTaskInfo->code; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 068998e469..5f20d2e50a 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1364,9 +1364,12 @@ void releaseUdfFuncHandle(char* udfName) { SUdfcFuncStub key = {0}; strcpy(key.udfName, udfName); SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - ASSERT(foundStub); - --foundStub->refCount; - ASSERT(foundStub->refCount>=0); + if (!foundStub) { + return; + } + if (foundStub->refCount > 0) { + --foundStub->refCount; + } uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); } @@ -1377,7 +1380,7 @@ int32_t cleanUpUdfs() { while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); if (stub->refCount == 0) { - fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); + fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); doTeardownUdf(stub->handle); } else { fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", @@ -1530,12 +1533,15 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, } SUdfcUvSession *session = handle; code = doCallUdfScalarFunc(handle, input, numOfCols, output); - if (session->outputType != output->columnData->info.type - || session->outputLen != output->columnData->info.bytes) { - fnError("udfc scalar function calculate error, session type: %d(%d), output type: %d(%d)", - session->outputType, session->outputLen, - output->columnData->info.type, output->columnData->info.bytes); + if (output->columnData == NULL) { + fnError("udfc scalar function calculate error. no column data"); code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; + } else { + if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) { + fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType, + session->outputLen, output->columnData->info.type, output->columnData->info.bytes); + code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; + } } releaseUdfFuncHandle(udfName); return code; @@ -1565,7 +1571,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); - taosMemoryFree(task->session); + taosMemoryFree(session); taosMemoryFree(task); return err; @@ -1573,7 +1579,6 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { //memory layout |---SUdfAggRes----|-----final result-----|---inter result----| typedef struct SUdfAggRes { - SUdfcUvSession *session; int8_t finalResNum; int8_t interResNum; char* finalResBuf; @@ -1606,7 +1611,6 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; - udfRes->session = (SUdfcUvSession *)handle; SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); @@ -1621,22 +1625,26 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult releaseUdfFuncHandle(pCtx->udfName); return false; } + releaseUdfFuncHandle(pCtx->udfName); freeUdfInterBuf(&buf); return true; } int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { - SInputColumnInfoData* pInput = &pCtx->input; - int32_t numOfCols = pInput->numOfInputCols; - - SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - SUdfcUvSession *session = udfRes->session; - if (session == NULL) { - return TSDB_CODE_UDF_NO_FUNC_HANDLE; + int32_t udfCode = 0; + UdfcFuncHandle handle = 0; + if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { + fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode); + return udfCode; } + + SUdfcUvSession *session = handle; + SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; + SInputColumnInfoData* pInput = &pCtx->input; + int32_t numOfCols = pInput->numOfInputCols; int32_t start = pInput->startRowIndex; int32_t numOfRows = pInput->numOfRows; @@ -1664,7 +1672,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; - int32_t udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); + udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); if (udfCode != 0) { fnError("udfAggProcess error. code: %d", udfCode); newState.numOfResult = 0; @@ -1684,19 +1692,21 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { blockDataDestroy(inputBlock); taosArrayDestroy(tempBlock.pDataBlock); - if (udfCode != 0) { - releaseUdfFuncHandle(pCtx->udfName); - } + releaseUdfFuncHandle(pCtx->udfName); freeUdfInterBuf(&newState); return udfCode; } int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { - SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - SUdfcUvSession *session = udfRes->session; - if (session == NULL) { - return TSDB_CODE_UDF_NO_FUNC_HANDLE; + int32_t udfCode = 0; + UdfcFuncHandle handle = 0; + if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { + fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode); + return udfCode; } + + SUdfcUvSession *session = handle; + SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;