enhance: change udf func body dir to tsDataDir/.udf

This commit is contained in:
slzhou 2023-04-09 20:31:25 +08:00
parent dc86bf9671
commit b725c9e192
1 changed files with 45 additions and 13 deletions

View File

@ -229,6 +229,7 @@ typedef struct SUdfdContext {
SArray *residentFuncs;
char udfDataDir[PATH_MAX];
bool printVersion;
} SUdfdContext;
@ -390,12 +391,13 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
}
if (plugin->openFunc) {
int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsDataDir) + 1 + 1; // tsDataDir:tsUdfdLdLibPath
char *pythonPath = taosMemoryMalloc(lenPythonPath);
int16_t lenPythonPath =
strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1; // global.udfDataDir:tsUdfdLdLibPath
char *pythonPath = taosMemoryMalloc(lenPythonPath);
#ifdef WINDOWS
snprintf(pythonPath, lenPythonPath, "%s;%s", tsDataDir, tsUdfdLdLibPath);
snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath);
#else
snprintf(pythonPath, lenPythonPath, "%s:%s", tsDataDir, tsUdfdLdLibPath);
snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath);
#endif
SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}};
err = plugin->openFunc(items, 2);
@ -567,7 +569,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) {
SUdf *udfdNewUdf(const char *udfName) {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
udfNew->lastFetchTime = taosGetTimestampUs();
udfNew->lastFetchTime = taosGetTimestampMs();
strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
udfNew->state = UDF_STATE_INIT;
@ -592,15 +594,19 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) {
int64_t currTime = taosGetTimestampSec();
bool expired = false;
if (pUdfHash) {
expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000 * 1000; // 10s
expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s
if (!expired) {
++(*pUdfHash)->refCount;
SUdf *udf = *pUdfHash;
uv_mutex_unlock(&global.udfsMutex);
fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, udf->name, udf->version,
udf->createdTime);
return udf;
} else {
(*pUdfHash)->expired = true;
taosHashRemove(global.udfsHash, udfName, strlen(udfName));
fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
(*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
}
}
@ -814,21 +820,22 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", tsDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
udf->createdTime);
#endif
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#endif
} else {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
#endif
}
}
@ -845,6 +852,7 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
bool fileExist = !(taosStatFile(path, NULL, NULL) < 0);
if (fileExist) {
strncpy(udf->path, path, PATH_MAX);
fnInfo("udfd func body file. reuse existing file %s", path);
return TSDB_CODE_SUCCESS;
}
@ -1429,6 +1437,24 @@ int32_t udfdCleanup() {
return 0;
}
int32_t udfdCreateUdfSourceDir() {
snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir);
int32_t code = taosMkDir(global.udfDataDir);
if (code != TSDB_CODE_SUCCESS) {
snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir);
code = taosMkDir(global.udfDataDir);
}
fnInfo("udfd create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code));
return code;
}
int32_t udfdDestroyUdfSourceDir() {
fnInfo("destory udf source directory %s", global.udfDataDir);
taosRemoveDir(global.udfDataDir);
return 0;
}
int main(int argc, char *argv[]) {
if (!taosCheckSystemIsLittleEnd()) {
printf("failed to start since on non-little-end machines\n");
@ -1457,10 +1483,15 @@ int main(int argc, char *argv[]) {
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure");
fnError("open rpc connection to mnode failed");
return -3;
}
if (udfdCreateUdfSourceDir() != 0) {
fnError("create udf source directory failed");
return -4;
}
if (udfdUvInit() != 0) {
fnError("uv init failure");
return -5;
@ -1474,6 +1505,7 @@ int main(int argc, char *argv[]) {
udfdRun();
removeListeningPipe();
udfdDestroyUdfSourceDir();
udfdCloseClientRpc();
udfdDeinitResidentFuncs();