feat: refine udfd
This commit is contained in:
parent
730851ecc3
commit
5a15151b87
|
@ -96,10 +96,14 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
|
||||||
|
|
||||||
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
strcpy(udf->name, udfName);
|
strcpy(udf->name, udfName);
|
||||||
|
int32_t err = 0;
|
||||||
|
err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
|
||||||
|
if (err != 0) {
|
||||||
|
fnError("can not retrieve udf from mnode. udf name %s", udfName);
|
||||||
|
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
|
err = uv_dlopen(udf->path, &udf->lib);
|
||||||
//strcpy(udf->path, "/home/slzhou/TDengine/debug/build/lib/libudf1.so");
|
|
||||||
int err = uv_dlopen(udf->path, &udf->lib);
|
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
||||||
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
||||||
|
@ -142,7 +146,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
|
|
||||||
void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
||||||
// TODO: tracable id from client. connect, setup, call, teardown
|
// TODO: tracable id from client. connect, setup, call, teardown
|
||||||
fnInfo("%" PRId64 " setup request. udf name: %s", request->seqNum, request->setup.udfName);
|
fnInfo( "setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
|
||||||
SUdfSetupRequest *setup = &request->setup;
|
SUdfSetupRequest *setup = &request->setup;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SUdf *udf = NULL;
|
SUdf *udf = NULL;
|
||||||
|
@ -276,7 +280,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
|
|
||||||
void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
||||||
SUdfTeardownRequest *teardown = &request->teardown;
|
SUdfTeardownRequest *teardown = &request->teardown;
|
||||||
fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
||||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
||||||
SUdf *udf = handle->udf;
|
SUdf *udf = handle->udf;
|
||||||
bool unloadUdf = false;
|
bool unloadUdf = false;
|
||||||
|
@ -800,11 +804,6 @@ static int32_t udfdRun() {
|
||||||
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
uv_mutex_init(&global.udfsMutex);
|
uv_mutex_init(&global.udfsMutex);
|
||||||
|
|
||||||
if (udfdUvInit() != 0) {
|
|
||||||
fnError("uv init failure");
|
|
||||||
return -2;
|
|
||||||
}
|
|
||||||
|
|
||||||
fnInfo("start the udfd");
|
fnInfo("start the udfd");
|
||||||
int code = uv_run(global.loop, UV_RUN_DEFAULT);
|
int code = uv_run(global.loop, UV_RUN_DEFAULT);
|
||||||
fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code);
|
fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code);
|
||||||
|
@ -853,6 +852,11 @@ int main(int argc, char *argv[]) {
|
||||||
return -4;
|
return -4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (udfdUvInit() != 0) {
|
||||||
|
fnError("uv init failure");
|
||||||
|
return -5;
|
||||||
|
}
|
||||||
|
|
||||||
udfdRun();
|
udfdRun();
|
||||||
|
|
||||||
udfdCloseClientRpc();
|
udfdCloseClientRpc();
|
||||||
|
|
|
@ -47,7 +47,10 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
UdfcFuncHandle handle;
|
UdfcFuncHandle handle;
|
||||||
|
|
||||||
doSetupUdf("udf1", &handle);
|
if (doSetupUdf("udf1", &handle) != 0) {
|
||||||
|
fnError("setup udf failure");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
SSDataBlock *pBlock = █
|
SSDataBlock *pBlock = █
|
||||||
|
|
Loading…
Reference in New Issue