Merge pull request #12568 from taosdata/feature/udf
feat: refine udfd trace and error processing
This commit is contained in:
commit
cee955bf94
|
@ -87,6 +87,7 @@ typedef struct SUdfInterBuf {
|
||||||
} SUdfInterBuf;
|
} SUdfInterBuf;
|
||||||
typedef void *UdfcFuncHandle;
|
typedef void *UdfcFuncHandle;
|
||||||
|
|
||||||
|
//low level APIs
|
||||||
/**
|
/**
|
||||||
* setup udf
|
* setup udf
|
||||||
* @param udf, in
|
* @param udf, in
|
||||||
|
@ -115,6 +116,9 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
|
||||||
*/
|
*/
|
||||||
int32_t doTeardownUdf(UdfcFuncHandle handle);
|
int32_t doTeardownUdf(UdfcFuncHandle handle);
|
||||||
|
|
||||||
|
void freeUdfInterBuf(SUdfInterBuf *buf);
|
||||||
|
|
||||||
|
//high level APIs
|
||||||
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
||||||
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
|
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
|
||||||
|
|
|
@ -795,7 +795,6 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
|
||||||
}
|
}
|
||||||
output->info.hasVarCol = hasVarCol;
|
output->info.hasVarCol = hasVarCol;
|
||||||
|
|
||||||
//TODO: free the array output->pDataBlock
|
|
||||||
output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
taosArrayPush(output->pDataBlock, (input + i)->columnData);
|
taosArrayPush(output->pDataBlock, (input + i)->columnData);
|
||||||
|
@ -809,8 +808,12 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
output->numOfRows = input->info.rows;
|
output->numOfRows = input->info.rows;
|
||||||
//TODO: memory
|
|
||||||
output->columnData = taosArrayGet(input->pDataBlock, 0);
|
output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
|
||||||
|
memcpy(output->columnData,
|
||||||
|
taosArrayGet(input->pDataBlock, 0),
|
||||||
|
sizeof(SColumnInfoData));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -833,7 +836,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *
|
||||||
fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
|
fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
|
||||||
if (uvTask->type == UV_TASK_REQ_RSP) {
|
if (uvTask->type == UV_TASK_REQ_RSP) {
|
||||||
if (uvTask->rspBuf.base != NULL) {
|
if (uvTask->rspBuf.base != NULL) {
|
||||||
SUdfResponse rsp;
|
SUdfResponse rsp = {0};
|
||||||
void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
|
void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
|
||||||
assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
|
assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
|
||||||
task->errCode = rsp.code;
|
task->errCode = rsp.code;
|
||||||
|
@ -1427,7 +1430,10 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
|
||||||
int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
|
int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
convertDataBlockToScalarParm(&resultBlock, output);
|
convertDataBlockToScalarParm(&resultBlock, output);
|
||||||
|
taosArrayDestroy(resultBlock.pDataBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(inputBlock.pDataBlock);
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1508,16 +1514,15 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
||||||
|
|
||||||
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
|
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
|
||||||
|
|
||||||
SUdfTeardownResponse *rsp = &task->_teardown.rsp;
|
|
||||||
int32_t err = task->errCode;
|
int32_t err = task->errCode;
|
||||||
|
|
||||||
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
|
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
|
||||||
|
|
||||||
|
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
|
||||||
|
|
||||||
taosMemoryFree(task->session);
|
taosMemoryFree(task->session);
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
|
|
||||||
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
|
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1564,6 +1569,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
||||||
}
|
}
|
||||||
udfRes->interResNum = buf.numOfResult;
|
udfRes->interResNum = buf.numOfResult;
|
||||||
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
|
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
|
||||||
|
freeUdfInterBuf(&buf);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1621,7 +1627,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
blockDataDestroy(inputBlock);
|
blockDataDestroy(inputBlock);
|
||||||
taosArrayDestroy(tempBlock.pDataBlock);
|
taosArrayDestroy(tempBlock.pDataBlock);
|
||||||
|
|
||||||
taosMemoryFree(newState.buf);
|
freeUdfInterBuf(&newState);
|
||||||
return udfCode;
|
return udfCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1650,6 +1656,8 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
|
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
freeUdfInterBuf(&resultBuf);
|
||||||
|
|
||||||
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
|
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
|
||||||
releaseUdfFuncHandle(pCtx->udfName);
|
releaseUdfFuncHandle(pCtx->udfName);
|
||||||
return udfCallCode == 0 ? numOfResults : udfCallCode;
|
return udfCallCode == 0 ? numOfResults : udfCallCode;
|
||||||
|
|
|
@ -96,10 +96,14 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
|
||||||
|
|
||||||
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
strcpy(udf->name, udfName);
|
strcpy(udf->name, udfName);
|
||||||
|
int32_t err = 0;
|
||||||
|
err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
|
||||||
|
if (err != 0) {
|
||||||
|
fnError("can not retrieve udf from mnode. udf name %s", udfName);
|
||||||
|
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
|
err = uv_dlopen(udf->path, &udf->lib);
|
||||||
//strcpy(udf->path, "/home/slzhou/TDengine/debug/build/lib/libudf1.so");
|
|
||||||
int err = uv_dlopen(udf->path, &udf->lib);
|
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
||||||
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
||||||
|
@ -142,7 +146,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
|
|
||||||
void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
||||||
// TODO: tracable id from client. connect, setup, call, teardown
|
// TODO: tracable id from client. connect, setup, call, teardown
|
||||||
fnInfo("%" PRId64 " setup request. udf name: %s", request->seqNum, request->setup.udfName);
|
fnInfo( "setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
|
||||||
SUdfSetupRequest *setup = &request->setup;
|
SUdfSetupRequest *setup = &request->setup;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SUdf *udf = NULL;
|
SUdf *udf = NULL;
|
||||||
|
@ -276,7 +280,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
|
|
||||||
void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
||||||
SUdfTeardownRequest *teardown = &request->teardown;
|
SUdfTeardownRequest *teardown = &request->teardown;
|
||||||
fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
||||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
||||||
SUdf *udf = handle->udf;
|
SUdf *udf = handle->udf;
|
||||||
bool unloadUdf = false;
|
bool unloadUdf = false;
|
||||||
|
@ -800,17 +804,11 @@ static int32_t udfdRun() {
|
||||||
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
uv_mutex_init(&global.udfsMutex);
|
uv_mutex_init(&global.udfsMutex);
|
||||||
|
|
||||||
if (udfdUvInit() != 0) {
|
|
||||||
fnError("uv init failure");
|
|
||||||
return -2;
|
|
||||||
}
|
|
||||||
|
|
||||||
fnInfo("start the udfd");
|
fnInfo("start the udfd");
|
||||||
int code = uv_run(global.loop, UV_RUN_DEFAULT);
|
int code = uv_run(global.loop, UV_RUN_DEFAULT);
|
||||||
fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code);
|
fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code);
|
||||||
int codeClose = uv_loop_close(global.loop);
|
int codeClose = uv_loop_close(global.loop);
|
||||||
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
|
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
|
||||||
removeListeningPipe();
|
|
||||||
uv_mutex_destroy(&global.udfsMutex);
|
uv_mutex_destroy(&global.udfsMutex);
|
||||||
taosHashCleanup(global.udfsHash);
|
taosHashCleanup(global.udfsHash);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -853,8 +851,14 @@ int main(int argc, char *argv[]) {
|
||||||
return -4;
|
return -4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (udfdUvInit() != 0) {
|
||||||
|
fnError("uv init failure");
|
||||||
|
return -5;
|
||||||
|
}
|
||||||
|
|
||||||
udfdRun();
|
udfdRun();
|
||||||
|
|
||||||
udfdCloseClientRpc();
|
removeListeningPipe();
|
||||||
|
|
||||||
|
udfdCloseClientRpc();
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,20 +34,13 @@ static int32_t initLog() {
|
||||||
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
|
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int scalarFuncTest() {
|
||||||
parseArgs(argc, argv);
|
|
||||||
initLog();
|
|
||||||
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
|
|
||||||
fnError("failed to start since read config error");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
udfcOpen();
|
|
||||||
uv_sleep(1000);
|
|
||||||
|
|
||||||
UdfcFuncHandle handle;
|
UdfcFuncHandle handle;
|
||||||
|
|
||||||
doSetupUdf("udf1", &handle);
|
if (doSetupUdf("udf1", &handle) != 0) {
|
||||||
|
fnError("setup udf failure");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
SSDataBlock *pBlock = █
|
SSDataBlock *pBlock = █
|
||||||
|
@ -74,11 +67,78 @@ int main(int argc, char *argv[]) {
|
||||||
input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
|
input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
SScalarParam output = {0};
|
SScalarParam output = {0};
|
||||||
doCallUdfScalarFunc(handle, &input, 1, &output);
|
doCallUdfScalarFunc(handle, &input, 1, &output);
|
||||||
|
taosArrayDestroy(pBlock->pDataBlock);
|
||||||
SColumnInfoData *col = output.columnData;
|
SColumnInfoData *col = output.columnData;
|
||||||
for (int32_t i = 0; i < output.numOfRows; ++i) {
|
for (int32_t i = 0; i < output.numOfRows; ++i) {
|
||||||
fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
|
fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
colDataDestroy(output.columnData);
|
||||||
|
taosMemoryFree(output.columnData);
|
||||||
|
|
||||||
doTeardownUdf(handle);
|
doTeardownUdf(handle);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int aggregateFuncTest() {
|
||||||
|
UdfcFuncHandle handle;
|
||||||
|
|
||||||
|
if (doSetupUdf("udf2", &handle) != 0) {
|
||||||
|
fnError("setup udf failure");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock block = {0};
|
||||||
|
SSDataBlock *pBlock = █
|
||||||
|
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
||||||
|
pBlock->info.numOfCols = 1;
|
||||||
|
pBlock->info.rows = 4;
|
||||||
|
char data[16] = {0};
|
||||||
|
char bitmap[4] = {0};
|
||||||
|
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
|
SColumnInfoData colInfo = {0};
|
||||||
|
colInfo.info.type = TSDB_DATA_TYPE_INT;
|
||||||
|
colInfo.info.bytes = sizeof(int32_t);
|
||||||
|
colInfo.info.colId = 1;
|
||||||
|
colInfo.pData = data;
|
||||||
|
colInfo.nullbitmap = bitmap;
|
||||||
|
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||||
|
colDataAppendInt32(&colInfo, j, &j);
|
||||||
|
}
|
||||||
|
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
SUdfInterBuf buf = {0};
|
||||||
|
SUdfInterBuf newBuf = {0};
|
||||||
|
SUdfInterBuf resultBuf = {0};
|
||||||
|
doCallUdfAggInit(handle, &buf);
|
||||||
|
doCallUdfAggProcess(handle, pBlock, &buf, &newBuf);
|
||||||
|
taosArrayDestroy(pBlock->pDataBlock);
|
||||||
|
|
||||||
|
doCallUdfAggFinalize(handle, &newBuf, &resultBuf);
|
||||||
|
fprintf(stderr, "agg result: %f\n", *(double*)resultBuf.buf);
|
||||||
|
|
||||||
|
freeUdfInterBuf(&buf);
|
||||||
|
freeUdfInterBuf(&newBuf);
|
||||||
|
freeUdfInterBuf(&resultBuf);
|
||||||
|
doTeardownUdf(handle);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
parseArgs(argc, argv);
|
||||||
|
initLog();
|
||||||
|
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
|
||||||
|
fnError("failed to start since read config error");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
udfcOpen();
|
||||||
|
uv_sleep(1000);
|
||||||
|
|
||||||
|
scalarFuncTest();
|
||||||
|
aggregateFuncTest();
|
||||||
udfcClose();
|
udfcClose();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue