fix: handle function of udf not implemented
This commit is contained in:
parent
b0ab4be6f4
commit
a39ada58a5
|
@ -1705,7 +1705,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
if (task->errCode != 0) {
|
if (task->errCode != 0) {
|
||||||
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
|
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
|
||||||
} else {
|
} else {
|
||||||
fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
|
fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
|
||||||
*funcHandle = task->session;
|
*funcHandle = task->session;
|
||||||
}
|
}
|
||||||
int32_t err = task->errCode;
|
int32_t err = task->errCode;
|
||||||
|
|
|
@ -236,7 +236,7 @@ typedef struct SUvUdfWork {
|
||||||
struct SUvUdfWork *pWorkNext;
|
struct SUvUdfWork *pWorkNext;
|
||||||
} SUvUdfWork;
|
} SUvUdfWork;
|
||||||
|
|
||||||
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState;
|
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY} EUdfState;
|
||||||
|
|
||||||
typedef struct SUdf {
|
typedef struct SUdf {
|
||||||
char name[TSDB_FUNC_NAME_LEN + 1];
|
char name[TSDB_FUNC_NAME_LEN + 1];
|
||||||
|
@ -570,6 +570,14 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) {
|
||||||
uv_cond_init(&udfNew->condReady);
|
uv_cond_init(&udfNew->condReady);
|
||||||
|
|
||||||
udf = udfNew;
|
udf = udfNew;
|
||||||
|
udf->resident = false;
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
|
||||||
|
char *funcName = taosArrayGet(global.residentFuncs, i);
|
||||||
|
if (strcmp(udfName, funcName) == 0) {
|
||||||
|
udf->resident = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
SUdf **pUdf = &udf;
|
SUdf **pUdf = &udf;
|
||||||
taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES);
|
taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES);
|
||||||
uv_mutex_unlock(&global.udfsMutex);
|
uv_mutex_unlock(&global.udfsMutex);
|
||||||
|
@ -591,20 +599,15 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
if (udf->state == UDF_STATE_INIT) {
|
if (udf->state == UDF_STATE_INIT) {
|
||||||
udf->state = UDF_STATE_LOADING;
|
udf->state = UDF_STATE_LOADING;
|
||||||
code = udfdInitUdf(setup->udfName, udf);
|
code = udfdInitUdf(setup->udfName, udf);
|
||||||
|
if (code == 0) {
|
||||||
udf->resident = false;
|
udf->state = UDF_STATE_READY;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
|
} else {
|
||||||
char *funcName = taosArrayGet(global.residentFuncs, i);
|
udf->state = UDF_STATE_INIT;
|
||||||
if (strcmp(setup->udfName, funcName) == 0) {
|
|
||||||
udf->resident = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
udf->state = UDF_STATE_READY;
|
|
||||||
uv_cond_broadcast(&udf->condReady);
|
uv_cond_broadcast(&udf->condReady);
|
||||||
uv_mutex_unlock(&udf->lock);
|
uv_mutex_unlock(&udf->lock);
|
||||||
} else {
|
} else {
|
||||||
while (udf->state != UDF_STATE_READY) {
|
while (udf->state == UDF_STATE_LOADING) {
|
||||||
uv_cond_wait(&udf->condReady, &udf->lock);
|
uv_cond_wait(&udf->condReady, &udf->lock);
|
||||||
}
|
}
|
||||||
uv_mutex_unlock(&udf->lock);
|
uv_mutex_unlock(&udf->lock);
|
||||||
|
|
Loading…
Reference in New Issue