Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/alter_table
This commit is contained in:
commit
8e9ccfd1d0
|
@ -122,7 +122,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
|||
|
||||
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
|
||||
|
||||
int32_t teardownUdfs();
|
||||
int32_t cleanUpUdfs();
|
||||
// end API to taosd and qworker
|
||||
//=============================================================================================================================
|
||||
// begin API to UDF writer.
|
||||
|
|
|
@ -157,7 +157,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
|
||||
pTaskInfo->totalRows += current;
|
||||
|
||||
teardownUdfs();
|
||||
cleanUpUdfs();
|
||||
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
|
||||
|
||||
|
|
|
@ -25,8 +25,6 @@
|
|||
#include "functionMgt.h"
|
||||
|
||||
//TODO: add unit test
|
||||
//TODO: include all global variable under context struct
|
||||
|
||||
typedef struct SUdfdData {
|
||||
bool startCalled;
|
||||
bool needCleanUp;
|
||||
|
@ -1275,7 +1273,6 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
|
|||
}
|
||||
|
||||
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||
fnInfo("udfc setup udf. udfName: %s", udfName);
|
||||
if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
|
||||
return TSDB_CODE_UDF_INVALID_STATE;
|
||||
}
|
||||
|
@ -1305,7 +1302,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
|||
if (task->errCode != 0) {
|
||||
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
|
||||
} else {
|
||||
fnInfo("sucessfully setup udf func handle. handle: %p", task->session);
|
||||
fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
|
||||
*funcHandle = task->session;
|
||||
}
|
||||
int32_t err = task->errCode;
|
||||
|
@ -1490,13 +1487,11 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
|
|||
return code;
|
||||
}
|
||||
|
||||
//TODO: when to teardown udf. teardown udf is not called
|
||||
int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
||||
fnInfo("tear down udf. udf func handle: %p", handle);
|
||||
|
||||
SUdfcUvSession *session = (SUdfcUvSession *) handle;
|
||||
|
||||
if (session->udfUvPipe == NULL) {
|
||||
fnError("pipe to udfd does not exist");
|
||||
fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
|
||||
return TSDB_CODE_UDF_PIPE_NO_PIPE;
|
||||
}
|
||||
|
||||
|
@ -1511,7 +1506,6 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
|||
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
|
||||
|
||||
SUdfTeardownResponse *rsp = &task->_teardown.rsp;
|
||||
|
||||
int32_t err = task->errCode;
|
||||
|
||||
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
|
||||
|
@ -1519,6 +1513,8 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
|||
taosMemoryFree(task->session);
|
||||
taosMemoryFree(task);
|
||||
|
||||
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
|
@ -1651,25 +1647,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
|||
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
|
||||
}
|
||||
|
||||
// int32_t code = doTeardownUdf(session);
|
||||
// if (code != 0) {
|
||||
// fnError("udfAggFinalize error. doTeardownUdf step. udf code: %d", code);
|
||||
// }
|
||||
|
||||
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
|
||||
releaseUdfFuncHandle(pCtx->udfName);
|
||||
return udfCallCode == 0 ? numOfResults : udfCallCode;
|
||||
}
|
||||
|
||||
int32_t teardownUdfs() {
|
||||
int32_t cleanUpUdfs() {
|
||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||
int32_t i = 0;
|
||||
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
||||
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
|
||||
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
|
||||
if (stub->refCount == 0) {
|
||||
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
|
||||
doTeardownUdf(stub->handle);
|
||||
} else {
|
||||
fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle);
|
||||
taosArrayPush(udfStubs, stub);
|
||||
}
|
||||
++i;
|
||||
|
|
Loading…
Reference in New Issue