|
|
|
@ -140,6 +140,182 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
|
|
|
|
// TODO: tracable id from client. connect, setup, call, teardown
|
|
|
|
|
fnInfo("%" PRId64 " setup request. udf name: %s", request->seqNum, request->setup.udfName);
|
|
|
|
|
SUdfSetupRequest *setup = &request->setup;
|
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
|
|
|
SUdf *udf = NULL;
|
|
|
|
|
uv_mutex_lock(&global.udfsMutex);
|
|
|
|
|
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
|
|
|
|
|
if (udfInHash) {
|
|
|
|
|
++(*udfInHash)->refCount;
|
|
|
|
|
udf = *udfInHash;
|
|
|
|
|
uv_mutex_unlock(&global.udfsMutex);
|
|
|
|
|
} else {
|
|
|
|
|
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
|
|
|
|
udfNew->refCount = 1;
|
|
|
|
|
udfNew->state = UDF_STATE_INIT;
|
|
|
|
|
|
|
|
|
|
uv_mutex_init(&udfNew->lock);
|
|
|
|
|
uv_cond_init(&udfNew->condReady);
|
|
|
|
|
udf = udfNew;
|
|
|
|
|
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
|
|
|
|
|
uv_mutex_unlock(&global.udfsMutex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uv_mutex_lock(&udf->lock);
|
|
|
|
|
if (udf->state == UDF_STATE_INIT) {
|
|
|
|
|
udf->state = UDF_STATE_LOADING;
|
|
|
|
|
code = udfdLoadUdf(setup->udfName, udf);
|
|
|
|
|
if (udf->initFunc) {
|
|
|
|
|
udf->initFunc();
|
|
|
|
|
}
|
|
|
|
|
udf->state = UDF_STATE_READY;
|
|
|
|
|
uv_cond_broadcast(&udf->condReady);
|
|
|
|
|
uv_mutex_unlock(&udf->lock);
|
|
|
|
|
} else {
|
|
|
|
|
while (udf->state != UDF_STATE_READY) {
|
|
|
|
|
uv_cond_wait(&udf->condReady, &udf->lock);
|
|
|
|
|
}
|
|
|
|
|
uv_mutex_unlock(&udf->lock);
|
|
|
|
|
}
|
|
|
|
|
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
|
|
|
|
|
handle->udf = udf;
|
|
|
|
|
|
|
|
|
|
SUdfResponse rsp;
|
|
|
|
|
rsp.seqNum = request->seqNum;
|
|
|
|
|
rsp.type = request->type;
|
|
|
|
|
rsp.code = code;
|
|
|
|
|
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
|
|
|
|
rsp.setupRsp.outputType = udf->outputType;
|
|
|
|
|
rsp.setupRsp.outputLen = udf->outputLen;
|
|
|
|
|
rsp.setupRsp.bufSize = udf->bufSize;
|
|
|
|
|
|
|
|
|
|
int32_t len = encodeUdfResponse(NULL, &rsp);
|
|
|
|
|
rsp.msgLen = len;
|
|
|
|
|
void *bufBegin = taosMemoryMalloc(len);
|
|
|
|
|
void *buf = bufBegin;
|
|
|
|
|
encodeUdfResponse(&buf, &rsp);
|
|
|
|
|
|
|
|
|
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(uvUdf->input.base);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
|
|
|
|
SUdfCallRequest *call = &request->call;
|
|
|
|
|
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType,
|
|
|
|
|
call->udfHandle);
|
|
|
|
|
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
|
|
|
|
SUdf *udf = handle->udf;
|
|
|
|
|
SUdfResponse response = {0};
|
|
|
|
|
SUdfResponse *rsp = &response;
|
|
|
|
|
SUdfCallResponse *subRsp = &rsp->callRsp;
|
|
|
|
|
|
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
|
|
|
switch(call->callType) {
|
|
|
|
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
|
|
|
|
SUdfColumn output = {0};
|
|
|
|
|
|
|
|
|
|
SUdfDataBlock input = {0};
|
|
|
|
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
|
|
|
|
code = udf->scalarProcFunc(&input, &output);
|
|
|
|
|
|
|
|
|
|
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
|
|
|
|
freeUdfColumn(&output);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case TSDB_UDF_CALL_AGG_INIT: {
|
|
|
|
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
|
|
|
|
.bufLen= udf->bufSize,
|
|
|
|
|
.numOfResult = 0};
|
|
|
|
|
udf->aggStartFunc(&outBuf);
|
|
|
|
|
subRsp->resultBuf = outBuf;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case TSDB_UDF_CALL_AGG_PROC: {
|
|
|
|
|
SUdfDataBlock input = {0};
|
|
|
|
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
|
|
|
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
|
|
|
|
.bufLen= udf->bufSize,
|
|
|
|
|
.numOfResult = 0};
|
|
|
|
|
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
|
|
|
|
|
subRsp->resultBuf = outBuf;
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case TSDB_UDF_CALL_AGG_FIN: {
|
|
|
|
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
|
|
|
|
.bufLen= udf->bufSize,
|
|
|
|
|
.numOfResult = 0};
|
|
|
|
|
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
|
|
|
|
|
subRsp->resultBuf = outBuf;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rsp->seqNum = request->seqNum;
|
|
|
|
|
rsp->type = request->type;
|
|
|
|
|
rsp->code = code;
|
|
|
|
|
subRsp->callType = call->callType;
|
|
|
|
|
|
|
|
|
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
|
|
|
|
rsp->msgLen = len;
|
|
|
|
|
void *bufBegin = taosMemoryMalloc(len);
|
|
|
|
|
void *buf = bufBegin;
|
|
|
|
|
encodeUdfResponse(&buf, rsp);
|
|
|
|
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(uvUdf->input.base);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
|
|
|
|
SUdfTeardownRequest *teardown = &request->teardown;
|
|
|
|
|
fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
|
|
|
|
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
|
|
|
|
SUdf *udf = handle->udf;
|
|
|
|
|
bool unloadUdf = false;
|
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
|
|
|
|
|
|
|
|
uv_mutex_lock(&global.udfsMutex);
|
|
|
|
|
udf->refCount--;
|
|
|
|
|
if (udf->refCount == 0) {
|
|
|
|
|
unloadUdf = true;
|
|
|
|
|
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
|
|
|
|
|
}
|
|
|
|
|
uv_mutex_unlock(&global.udfsMutex);
|
|
|
|
|
if (unloadUdf) {
|
|
|
|
|
uv_cond_destroy(&udf->condReady);
|
|
|
|
|
uv_mutex_destroy(&udf->lock);
|
|
|
|
|
if (udf->destroyFunc) {
|
|
|
|
|
(udf->destroyFunc)();
|
|
|
|
|
}
|
|
|
|
|
uv_dlclose(&udf->lib);
|
|
|
|
|
taosMemoryFree(udf);
|
|
|
|
|
}
|
|
|
|
|
taosMemoryFree(handle);
|
|
|
|
|
|
|
|
|
|
SUdfResponse response;
|
|
|
|
|
SUdfResponse *rsp = &response;
|
|
|
|
|
rsp->seqNum = request->seqNum;
|
|
|
|
|
rsp->type = request->type;
|
|
|
|
|
rsp->code = code;
|
|
|
|
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
|
|
|
|
rsp->msgLen = len;
|
|
|
|
|
void *bufBegin = taosMemoryMalloc(len);
|
|
|
|
|
void *buf = bufBegin;
|
|
|
|
|
encodeUdfResponse(&buf, rsp);
|
|
|
|
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(uvUdf->input.base);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void udfdProcessRequest(uv_work_t *req) {
|
|
|
|
|
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
|
|
|
|
|
SUdfRequest request = {0};
|
|
|
|
@ -147,172 +323,16 @@ void udfdProcessRequest(uv_work_t *req) {
|
|
|
|
|
|
|
|
|
|
switch (request.type) {
|
|
|
|
|
case UDF_TASK_SETUP: {
|
|
|
|
|
// TODO: tracable id from client. connect, setup, call, teardown
|
|
|
|
|
fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName);
|
|
|
|
|
SUdfSetupRequest *setup = &request.setup;
|
|
|
|
|
|
|
|
|
|
SUdf *udf = NULL;
|
|
|
|
|
uv_mutex_lock(&global.udfsMutex);
|
|
|
|
|
SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName));
|
|
|
|
|
if (udfInHash) {
|
|
|
|
|
++(*udfInHash)->refCount;
|
|
|
|
|
udf = *udfInHash;
|
|
|
|
|
uv_mutex_unlock(&global.udfsMutex);
|
|
|
|
|
} else {
|
|
|
|
|
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
|
|
|
|
udfNew->refCount = 1;
|
|
|
|
|
udfNew->state = UDF_STATE_INIT;
|
|
|
|
|
|
|
|
|
|
uv_mutex_init(&udfNew->lock);
|
|
|
|
|
uv_cond_init(&udfNew->condReady);
|
|
|
|
|
udf = udfNew;
|
|
|
|
|
taosHashPut(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName), &udfNew, sizeof(&udfNew));
|
|
|
|
|
uv_mutex_unlock(&global.udfsMutex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uv_mutex_lock(&udf->lock);
|
|
|
|
|
if (udf->state == UDF_STATE_INIT) {
|
|
|
|
|
udf->state = UDF_STATE_LOADING;
|
|
|
|
|
udfdLoadUdf(setup->udfName, udf);
|
|
|
|
|
if (udf->initFunc) {
|
|
|
|
|
udf->initFunc();
|
|
|
|
|
}
|
|
|
|
|
udf->state = UDF_STATE_READY;
|
|
|
|
|
uv_cond_broadcast(&udf->condReady);
|
|
|
|
|
uv_mutex_unlock(&udf->lock);
|
|
|
|
|
} else {
|
|
|
|
|
while (udf->state != UDF_STATE_READY) {
|
|
|
|
|
uv_cond_wait(&udf->condReady, &udf->lock);
|
|
|
|
|
}
|
|
|
|
|
uv_mutex_unlock(&udf->lock);
|
|
|
|
|
}
|
|
|
|
|
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
|
|
|
|
|
handle->udf = udf;
|
|
|
|
|
SUdfResponse rsp;
|
|
|
|
|
rsp.seqNum = request.seqNum;
|
|
|
|
|
rsp.type = request.type;
|
|
|
|
|
rsp.code = 0;
|
|
|
|
|
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
|
|
|
|
rsp.setupRsp.outputType = udf->outputType;
|
|
|
|
|
rsp.setupRsp.outputLen = udf->outputLen;
|
|
|
|
|
rsp.setupRsp.bufSize = udf->bufSize;
|
|
|
|
|
int32_t len = encodeUdfResponse(NULL, &rsp);
|
|
|
|
|
rsp.msgLen = len;
|
|
|
|
|
void *bufBegin = taosMemoryMalloc(len);
|
|
|
|
|
void *buf = bufBegin;
|
|
|
|
|
encodeUdfResponse(&buf, &rsp);
|
|
|
|
|
|
|
|
|
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(uvUdf->input.base);
|
|
|
|
|
udfdProcessSetupRequest(uvUdf, &request);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case UDF_TASK_CALL: {
|
|
|
|
|
SUdfCallRequest *call = &request.call;
|
|
|
|
|
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType,
|
|
|
|
|
call->udfHandle);
|
|
|
|
|
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
|
|
|
|
SUdf *udf = handle->udf;
|
|
|
|
|
SUdfResponse response = {0};
|
|
|
|
|
SUdfResponse *rsp = &response;
|
|
|
|
|
SUdfCallResponse *subRsp = &rsp->callRsp;
|
|
|
|
|
|
|
|
|
|
switch(call->callType) {
|
|
|
|
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
|
|
|
|
SUdfColumn output = {0};
|
|
|
|
|
|
|
|
|
|
SUdfDataBlock input = {0};
|
|
|
|
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
|
|
|
|
udf->scalarProcFunc(&input, &output);
|
|
|
|
|
|
|
|
|
|
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
|
|
|
|
freeUdfColumn(&output);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case TSDB_UDF_CALL_AGG_INIT: {
|
|
|
|
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
|
|
|
|
.bufLen= udf->bufSize,
|
|
|
|
|
.numOfResult = 0};
|
|
|
|
|
udf->aggStartFunc(&outBuf);
|
|
|
|
|
subRsp->resultBuf = outBuf;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case TSDB_UDF_CALL_AGG_PROC: {
|
|
|
|
|
SUdfDataBlock input = {0};
|
|
|
|
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
|
|
|
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
|
|
|
|
.bufLen= udf->bufSize,
|
|
|
|
|
.numOfResult = 0};
|
|
|
|
|
udf->aggProcFunc(&input, &call->interBuf, &outBuf);
|
|
|
|
|
subRsp->resultBuf = outBuf;
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case TSDB_UDF_CALL_AGG_FIN: {
|
|
|
|
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
|
|
|
|
.bufLen= udf->bufSize,
|
|
|
|
|
.numOfResult = 0};
|
|
|
|
|
udf->aggFinishFunc(&call->interBuf, &outBuf);
|
|
|
|
|
subRsp->resultBuf = outBuf;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rsp->seqNum = request.seqNum;
|
|
|
|
|
rsp->type = request.type;
|
|
|
|
|
rsp->code = 0;
|
|
|
|
|
subRsp->callType = call->callType;
|
|
|
|
|
|
|
|
|
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
|
|
|
|
rsp->msgLen = len;
|
|
|
|
|
void *bufBegin = taosMemoryMalloc(len);
|
|
|
|
|
void *buf = bufBegin;
|
|
|
|
|
encodeUdfResponse(&buf, rsp);
|
|
|
|
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(uvUdf->input.base);
|
|
|
|
|
udfdProcessCallRequest(uvUdf, &request);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case UDF_TASK_TEARDOWN: {
|
|
|
|
|
SUdfTeardownRequest *teardown = &request.teardown;
|
|
|
|
|
fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle =
|
|
|
|
|
(SUdfcFuncHandle *)(teardown->udfHandle);
|
|
|
|
|
SUdf *udf = handle->udf;
|
|
|
|
|
bool unloadUdf = false;
|
|
|
|
|
uv_mutex_lock(&global.udfsMutex);
|
|
|
|
|
udf->refCount--;
|
|
|
|
|
if (udf->refCount == 0) {
|
|
|
|
|
unloadUdf = true;
|
|
|
|
|
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
|
|
|
|
|
}
|
|
|
|
|
uv_mutex_unlock(&global.udfsMutex);
|
|
|
|
|
if (unloadUdf) {
|
|
|
|
|
uv_cond_destroy(&udf->condReady);
|
|
|
|
|
uv_mutex_destroy(&udf->lock);
|
|
|
|
|
if (udf->destroyFunc) {
|
|
|
|
|
(udf->destroyFunc)();
|
|
|
|
|
}
|
|
|
|
|
uv_dlclose(&udf->lib);
|
|
|
|
|
taosMemoryFree(udf);
|
|
|
|
|
}
|
|
|
|
|
taosMemoryFree(handle);
|
|
|
|
|
|
|
|
|
|
SUdfResponse response;
|
|
|
|
|
SUdfResponse *rsp = &response;
|
|
|
|
|
rsp->seqNum = request.seqNum;
|
|
|
|
|
rsp->type = request.type;
|
|
|
|
|
rsp->code = 0;
|
|
|
|
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
|
|
|
|
rsp->msgLen = len;
|
|
|
|
|
void *bufBegin = taosMemoryMalloc(len);
|
|
|
|
|
void *buf = bufBegin;
|
|
|
|
|
encodeUdfResponse(&buf, rsp);
|
|
|
|
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(uvUdf->input.base);
|
|
|
|
|
udfdProcessTeardownRequest(uvUdf, &request);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
default: {
|
|
|
|
|