fix: taos_connect in gpd_init method and taos_close in gpd_destroy method
This commit is contained in:
parent
d156bff782
commit
e0445e0698
|
@ -29,8 +29,6 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
// clang-foramt on
|
// clang-foramt on
|
||||||
|
|
||||||
SArray* udfdResidentFuncs = NULL;
|
|
||||||
|
|
||||||
typedef struct SUdfdContext {
|
typedef struct SUdfdContext {
|
||||||
uv_loop_t * loop;
|
uv_loop_t * loop;
|
||||||
uv_pipe_t ctrlPipe;
|
uv_pipe_t ctrlPipe;
|
||||||
|
@ -43,6 +41,8 @@ typedef struct SUdfdContext {
|
||||||
uv_mutex_t udfsMutex;
|
uv_mutex_t udfsMutex;
|
||||||
SHashObj * udfsHash;
|
SHashObj * udfsHash;
|
||||||
|
|
||||||
|
SArray* residentFuncs;
|
||||||
|
|
||||||
bool printVersion;
|
bool printVersion;
|
||||||
} SUdfdContext;
|
} SUdfdContext;
|
||||||
|
|
||||||
|
@ -204,8 +204,8 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
udf->initFunc();
|
udf->initFunc();
|
||||||
}
|
}
|
||||||
udf->resident = false;
|
udf->resident = false;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(udfdResidentFuncs); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
|
||||||
char* funcName = taosArrayGet(udfdResidentFuncs, i);
|
char* funcName = taosArrayGet(global.residentFuncs, i);
|
||||||
if (strcmp(setup->udfName, funcName) == 0) {
|
if (strcmp(setup->udfName, funcName) == 0) {
|
||||||
udf->resident = true;
|
udf->resident = true;
|
||||||
break;
|
break;
|
||||||
|
@ -930,8 +930,6 @@ static int32_t udfdRun() {
|
||||||
uv_run(global.loop, UV_RUN_DEFAULT);
|
uv_run(global.loop, UV_RUN_DEFAULT);
|
||||||
uv_loop_close(global.loop);
|
uv_loop_close(global.loop);
|
||||||
|
|
||||||
uv_mutex_destroy(&global.udfsMutex);
|
|
||||||
taosHashCleanup(global.udfsHash);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -953,17 +951,17 @@ void udfdConnectMnodeThreadFunc(void *args) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udfdInitResidentFuncs() {
|
int32_t udfdInitResidentFuncs() {
|
||||||
udfdResidentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
|
global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
|
||||||
char gpd[TSDB_FUNC_NAME_LEN] = "gpd";
|
char gpd[TSDB_FUNC_NAME_LEN] = "gpd";
|
||||||
taosArrayPush(udfdResidentFuncs, gpd);
|
taosArrayPush(global.residentFuncs, gpd);
|
||||||
char gpdBatch[TSDB_FUNC_NAME_LEN] = "gpdbatch";
|
char gpdBatch[TSDB_FUNC_NAME_LEN] = "gpdbatch";
|
||||||
taosArrayPush(udfdResidentFuncs, gpdBatch);
|
taosArrayPush(global.residentFuncs, gpdBatch);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udfdDeinitResidentFuncs() {
|
int32_t udfdDeinitResidentFuncs() {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(udfdResidentFuncs); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
|
||||||
char* funcName = taosArrayGet(udfdResidentFuncs, i);
|
char* funcName = taosArrayGet(global.residentFuncs, i);
|
||||||
SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
|
SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
|
||||||
if (udfInHash) {
|
if (udfInHash) {
|
||||||
taosHashRemove(global.udfsHash, funcName, strlen(funcName));
|
taosHashRemove(global.udfsHash, funcName, strlen(funcName));
|
||||||
|
@ -975,9 +973,16 @@ int32_t udfdDeinitResidentFuncs() {
|
||||||
taosMemoryFree(udf);
|
taosMemoryFree(udf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosArrayDestroy(global.residentFuncs);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t udfdCleanup() {
|
||||||
|
uv_mutex_destroy(&global.udfsMutex);
|
||||||
|
taosHashCleanup(global.udfsHash);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
if (!taosCheckSystemIsLittleEnd()) {
|
if (!taosCheckSystemIsLittleEnd()) {
|
||||||
printf("failed to start since on non-little-end machines\n");
|
printf("failed to start since on non-little-end machines\n");
|
||||||
|
@ -1026,5 +1031,6 @@ int main(int argc, char *argv[]) {
|
||||||
udfdCloseClientRpc();
|
udfdCloseClientRpc();
|
||||||
|
|
||||||
udfdDeinitResidentFuncs();
|
udfdDeinitResidentFuncs();
|
||||||
|
udfdCleanup();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,12 +9,16 @@
|
||||||
#endif
|
#endif
|
||||||
#include "taosudf.h"
|
#include "taosudf.h"
|
||||||
|
|
||||||
|
TAOS* taos = NULL;
|
||||||
|
|
||||||
DLL_EXPORT int32_t gpd_init() {
|
DLL_EXPORT int32_t gpd_init() {
|
||||||
|
taos = taos_connect("localhost", "root", "taosdata", "", 7100);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
DLL_EXPORT int32_t gpd_destroy() {
|
DLL_EXPORT int32_t gpd_destroy() {
|
||||||
|
taos_close(taos);
|
||||||
|
taos_cleanup();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,11 +44,6 @@ DLL_EXPORT int32_t gpd(SUdfDataBlock* block, SUdfColumn *resultCol) {
|
||||||
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
|
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taos_init();
|
|
||||||
TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 7100);
|
|
||||||
if (taos == NULL) {
|
|
||||||
char* errstr = "can not connect";
|
|
||||||
}
|
|
||||||
TAOS_RES* res = taos_query(taos, "create database if not exists gpd");
|
TAOS_RES* res = taos_query(taos, "create database if not exists gpd");
|
||||||
if (taos_errno(res) != 0) {
|
if (taos_errno(res) != 0) {
|
||||||
char* errstr = taos_errstr(res);
|
char* errstr = taos_errstr(res);
|
||||||
|
@ -64,8 +63,6 @@ DLL_EXPORT int32_t gpd(SUdfDataBlock* block, SUdfColumn *resultCol) {
|
||||||
char* errstr = taos_errstr(res);
|
char* errstr = taos_errstr(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_close(taos);
|
|
||||||
taos_cleanup();
|
|
||||||
//to simulate actual processing delay by udf
|
//to simulate actual processing delay by udf
|
||||||
#ifdef LINUX
|
#ifdef LINUX
|
||||||
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
|
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
|
||||||
|
|
Loading…
Reference in New Issue