Merge pull request #12657 from taosdata/feature/udf

fix: improve udf function misuse stability
This commit is contained in:
shenglian-zhou 2022-05-18 18:37:57 +08:00 committed by GitHub
commit 1fecbd2587
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 27 deletions

View File

@ -133,6 +133,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pTaskInfo, ret); publishQueryAbortEvent(pTaskInfo, ret);
pTaskInfo->code = ret; pTaskInfo->code = ret;
cleanUpUdfs();
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
tstrerror(pTaskInfo->code)); tstrerror(pTaskInfo->code));
return pTaskInfo->code; return pTaskInfo->code;

View File

@ -1364,9 +1364,12 @@ void releaseUdfFuncHandle(char* udfName) {
SUdfcFuncStub key = {0}; SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName); strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
ASSERT(foundStub); if (!foundStub) {
--foundStub->refCount; return;
ASSERT(foundStub->refCount>=0); }
if (foundStub->refCount > 0) {
--foundStub->refCount;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
} }
@ -1377,7 +1380,7 @@ int32_t cleanUpUdfs() {
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) { if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle); doTeardownUdf(stub->handle);
} else { } else {
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
@ -1530,12 +1533,15 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
} }
SUdfcUvSession *session = handle; SUdfcUvSession *session = handle;
code = doCallUdfScalarFunc(handle, input, numOfCols, output); code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (session->outputType != output->columnData->info.type if (output->columnData == NULL) {
|| session->outputLen != output->columnData->info.bytes) { fnError("udfc scalar function calculate error. no column data");
fnError("udfc scalar function calculate error, session type: %d(%d), output type: %d(%d)",
session->outputType, session->outputLen,
output->columnData->info.type, output->columnData->info.bytes);
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
} else {
if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType,
session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
} }
releaseUdfFuncHandle(udfName); releaseUdfFuncHandle(udfName);
return code; return code;
@ -1565,7 +1571,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
taosMemoryFree(task->session); taosMemoryFree(session);
taosMemoryFree(task); taosMemoryFree(task);
return err; return err;
@ -1573,7 +1579,6 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----| //memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes { typedef struct SUdfAggRes {
SUdfcUvSession *session;
int8_t finalResNum; int8_t finalResNum;
int8_t interResNum; int8_t interResNum;
char* finalResBuf; char* finalResBuf;
@ -1606,7 +1611,6 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
udfRes->session = (SUdfcUvSession *)handle;
SUdfInterBuf buf = {0}; SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
@ -1621,22 +1625,26 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
releaseUdfFuncHandle(pCtx->udfName); releaseUdfFuncHandle(pCtx->udfName);
return false; return false;
} }
releaseUdfFuncHandle(pCtx->udfName);
freeUdfInterBuf(&buf); freeUdfInterBuf(&buf);
return true; return true;
} }
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; int32_t udfCode = 0;
int32_t numOfCols = pInput->numOfInputCols; UdfcFuncHandle handle = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
SUdfcUvSession *session = udfRes->session; return udfCode;
if (session == NULL) {
return TSDB_CODE_UDF_NO_FUNC_HANDLE;
} }
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SInputColumnInfoData* pInput = &pCtx->input;
int32_t numOfCols = pInput->numOfInputCols;
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows; int32_t numOfRows = pInput->numOfRows;
@ -1664,7 +1672,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
.numOfResult = udfRes->interResNum}; .numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0}; SUdfInterBuf newState = {0};
int32_t udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
if (udfCode != 0) { if (udfCode != 0) {
fnError("udfAggProcess error. code: %d", udfCode); fnError("udfAggProcess error. code: %d", udfCode);
newState.numOfResult = 0; newState.numOfResult = 0;
@ -1684,19 +1692,21 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
blockDataDestroy(inputBlock); blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock); taosArrayDestroy(tempBlock.pDataBlock);
if (udfCode != 0) { releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName);
}
freeUdfInterBuf(&newState); freeUdfInterBuf(&newState);
return udfCode; return udfCode;
} }
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); int32_t udfCode = 0;
SUdfcUvSession *session = udfRes->session; UdfcFuncHandle handle = 0;
if (session == NULL) { if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
return TSDB_CODE_UDF_NO_FUNC_HANDLE; fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
return udfCode;
} }
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;