Merge pull request #12518 from taosdata/feature/udf
fix: change function name for tearing down udf handles
This commit is contained in:
commit
8a867099b0
|
@ -122,7 +122,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
|
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
|
||||||
|
|
||||||
int32_t teardownUdfs();
|
int32_t cleanUpUdfs();
|
||||||
// end API to taosd and qworker
|
// end API to taosd and qworker
|
||||||
//=============================================================================================================================
|
//=============================================================================================================================
|
||||||
// begin API to UDF writer.
|
// begin API to UDF writer.
|
||||||
|
|
|
@ -157,7 +157,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||||
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
|
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
|
||||||
pTaskInfo->totalRows += current;
|
pTaskInfo->totalRows += current;
|
||||||
|
|
||||||
teardownUdfs();
|
cleanUpUdfs();
|
||||||
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||||
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
|
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,6 @@
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
|
||||||
//TODO: add unit test
|
//TODO: add unit test
|
||||||
//TODO: include all global variable under context struct
|
|
||||||
|
|
||||||
typedef struct SUdfdData {
|
typedef struct SUdfdData {
|
||||||
bool startCalled;
|
bool startCalled;
|
||||||
bool needCleanUp;
|
bool needCleanUp;
|
||||||
|
@ -1275,7 +1273,6 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
fnInfo("udfc setup udf. udfName: %s", udfName);
|
|
||||||
if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
|
if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
|
||||||
return TSDB_CODE_UDF_INVALID_STATE;
|
return TSDB_CODE_UDF_INVALID_STATE;
|
||||||
}
|
}
|
||||||
|
@ -1305,7 +1302,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
if (task->errCode != 0) {
|
if (task->errCode != 0) {
|
||||||
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
|
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
|
||||||
} else {
|
} else {
|
||||||
fnInfo("sucessfully setup udf func handle. handle: %p", task->session);
|
fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
|
||||||
*funcHandle = task->session;
|
*funcHandle = task->session;
|
||||||
}
|
}
|
||||||
int32_t err = task->errCode;
|
int32_t err = task->errCode;
|
||||||
|
@ -1490,13 +1487,11 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: when to teardown udf. teardown udf is not called
|
|
||||||
int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
||||||
fnInfo("tear down udf. udf func handle: %p", handle);
|
|
||||||
|
|
||||||
SUdfcUvSession *session = (SUdfcUvSession *) handle;
|
SUdfcUvSession *session = (SUdfcUvSession *) handle;
|
||||||
|
|
||||||
if (session->udfUvPipe == NULL) {
|
if (session->udfUvPipe == NULL) {
|
||||||
fnError("pipe to udfd does not exist");
|
fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
|
||||||
return TSDB_CODE_UDF_PIPE_NO_PIPE;
|
return TSDB_CODE_UDF_PIPE_NO_PIPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1511,7 +1506,6 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
||||||
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
|
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
|
||||||
|
|
||||||
SUdfTeardownResponse *rsp = &task->_teardown.rsp;
|
SUdfTeardownResponse *rsp = &task->_teardown.rsp;
|
||||||
|
|
||||||
int32_t err = task->errCode;
|
int32_t err = task->errCode;
|
||||||
|
|
||||||
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
|
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
|
||||||
|
@ -1519,6 +1513,8 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
||||||
taosMemoryFree(task->session);
|
taosMemoryFree(task->session);
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
|
|
||||||
|
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1651,25 +1647,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
|
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
// int32_t code = doTeardownUdf(session);
|
|
||||||
// if (code != 0) {
|
|
||||||
// fnError("udfAggFinalize error. doTeardownUdf step. udf code: %d", code);
|
|
||||||
// }
|
|
||||||
|
|
||||||
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
|
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
|
||||||
releaseUdfFuncHandle(pCtx->udfName);
|
releaseUdfFuncHandle(pCtx->udfName);
|
||||||
return udfCallCode == 0 ? numOfResults : udfCallCode;
|
return udfCallCode == 0 ? numOfResults : udfCallCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t teardownUdfs() {
|
int32_t cleanUpUdfs() {
|
||||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
||||||
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
|
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
|
||||||
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
|
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
|
||||||
if (stub->refCount == 0) {
|
if (stub->refCount == 0) {
|
||||||
|
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
|
||||||
doTeardownUdf(stub->handle);
|
doTeardownUdf(stub->handle);
|
||||||
} else {
|
} else {
|
||||||
|
fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle);
|
||||||
taosArrayPush(udfStubs, stub);
|
taosArrayPush(udfStubs, stub);
|
||||||
}
|
}
|
||||||
++i;
|
++i;
|
||||||
|
|
Loading…
Reference in New Issue