udf: refactor code
This commit is contained in:
parent
7976b6abfd
commit
e56a650877
|
@ -139,6 +139,226 @@ static int32_t udfdUvInit();
|
|||
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
|
||||
static int32_t udfdRun();
|
||||
|
||||
void udfdProcessRequest(uv_work_t *req) {
|
||||
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
|
||||
SUdfRequest request = {0};
|
||||
decodeUdfRequest(uvUdf->input.base, &request);
|
||||
|
||||
switch (request.type) {
|
||||
case UDF_TASK_SETUP: {
|
||||
udfdProcessSetupRequest(uvUdf, &request);
|
||||
break;
|
||||
}
|
||||
|
||||
case UDF_TASK_CALL: {
|
||||
udfdProcessCallRequest(uvUdf, &request);
|
||||
break;
|
||||
}
|
||||
case UDF_TASK_TEARDOWN: {
|
||||
udfdProcessTeardownRequest(uvUdf, &request);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
// TODO: tracable id from client. connect, setup, call, teardown
|
||||
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
|
||||
SUdfSetupRequest *setup = &request->setup;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SUdf * udf = NULL;
|
||||
uv_mutex_lock(&global.udfsMutex);
|
||||
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
|
||||
if (udfInHash) {
|
||||
++(*udfInHash)->refCount;
|
||||
udf = *udfInHash;
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
} else {
|
||||
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
||||
udfNew->refCount = 1;
|
||||
udfNew->state = UDF_STATE_INIT;
|
||||
|
||||
uv_mutex_init(&udfNew->lock);
|
||||
uv_cond_init(&udfNew->condReady);
|
||||
udf = udfNew;
|
||||
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
}
|
||||
|
||||
uv_mutex_lock(&udf->lock);
|
||||
if (udf->state == UDF_STATE_INIT) {
|
||||
udf->state = UDF_STATE_LOADING;
|
||||
code = udfdLoadUdf(setup->udfName, udf);
|
||||
if (udf->initFunc) {
|
||||
udf->initFunc();
|
||||
}
|
||||
udf->state = UDF_STATE_READY;
|
||||
uv_cond_broadcast(&udf->condReady);
|
||||
uv_mutex_unlock(&udf->lock);
|
||||
} else {
|
||||
while (udf->state != UDF_STATE_READY) {
|
||||
uv_cond_wait(&udf->condReady, &udf->lock);
|
||||
}
|
||||
uv_mutex_unlock(&udf->lock);
|
||||
}
|
||||
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
|
||||
handle->udf = udf;
|
||||
|
||||
SUdfResponse rsp;
|
||||
rsp.seqNum = request->seqNum;
|
||||
rsp.type = request->type;
|
||||
rsp.code = code;
|
||||
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||
rsp.setupRsp.outputType = udf->outputType;
|
||||
rsp.setupRsp.outputLen = udf->outputLen;
|
||||
rsp.setupRsp.bufSize = udf->bufSize;
|
||||
|
||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||
rsp.msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, &rsp);
|
||||
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
return;
|
||||
}
|
||||
|
||||
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
SUdfCallRequest *call = &request->call;
|
||||
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle);
|
||||
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle);
|
||||
SUdf * udf = handle->udf;
|
||||
SUdfResponse response = {0};
|
||||
SUdfResponse * rsp = &response;
|
||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (call->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||
SUdfColumn output = {0};
|
||||
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
code = udf->scalarProcFunc(&input, &output);
|
||||
freeUdfDataDataBlock(&input);
|
||||
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
||||
freeUdfColumn(&output);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_INIT: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
||||
udf->aggStartFunc(&outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_PROC: {
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
||||
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
|
||||
freeUdfInterBuf(&call->interBuf);
|
||||
freeUdfDataDataBlock(&input);
|
||||
subRsp->resultBuf = outBuf;
|
||||
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_FIN: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
||||
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
|
||||
freeUdfInterBuf(&call->interBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
rsp->seqNum = request->seqNum;
|
||||
rsp->type = request->type;
|
||||
rsp->code = code;
|
||||
subRsp->callType = call->callType;
|
||||
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
switch (call->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||
tDeleteSSDataBlock(&call->block);
|
||||
tDeleteSSDataBlock(&subRsp->resultData);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_INIT: {
|
||||
freeUdfInterBuf(&subRsp->resultBuf);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_PROC: {
|
||||
tDeleteSSDataBlock(&call->block);
|
||||
freeUdfInterBuf(&subRsp->resultBuf);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_FIN: {
|
||||
freeUdfInterBuf(&subRsp->resultBuf);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
return;
|
||||
}
|
||||
|
||||
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
SUdfTeardownRequest *teardown = &request->teardown;
|
||||
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
||||
SUdf * udf = handle->udf;
|
||||
bool unloadUdf = false;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
uv_mutex_lock(&global.udfsMutex);
|
||||
udf->refCount--;
|
||||
if (udf->refCount == 0) {
|
||||
unloadUdf = true;
|
||||
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
|
||||
}
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
if (unloadUdf) {
|
||||
uv_cond_destroy(&udf->condReady);
|
||||
uv_mutex_destroy(&udf->lock);
|
||||
if (udf->destroyFunc) {
|
||||
(udf->destroyFunc)();
|
||||
}
|
||||
uv_dlclose(&udf->lib);
|
||||
taosMemoryFree(udf);
|
||||
}
|
||||
taosMemoryFree(handle);
|
||||
|
||||
SUdfResponse response;
|
||||
SUdfResponse *rsp = &response;
|
||||
rsp->seqNum = request->seqNum;
|
||||
rsp->type = request->type;
|
||||
rsp->code = code;
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
return;
|
||||
}
|
||||
|
||||
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
|
||||
ASSERT(pMsg->info.ahandle != NULL);
|
||||
|
@ -388,226 +608,6 @@ int32_t udfdCloseClientRpc() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void udfdProcessRequest(uv_work_t *req) {
|
||||
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
|
||||
SUdfRequest request = {0};
|
||||
decodeUdfRequest(uvUdf->input.base, &request);
|
||||
|
||||
switch (request.type) {
|
||||
case UDF_TASK_SETUP: {
|
||||
udfdProcessSetupRequest(uvUdf, &request);
|
||||
break;
|
||||
}
|
||||
|
||||
case UDF_TASK_CALL: {
|
||||
udfdProcessCallRequest(uvUdf, &request);
|
||||
break;
|
||||
}
|
||||
case UDF_TASK_TEARDOWN: {
|
||||
udfdProcessTeardownRequest(uvUdf, &request);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
// TODO: tracable id from client. connect, setup, call, teardown
|
||||
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
|
||||
SUdfSetupRequest *setup = &request->setup;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SUdf * udf = NULL;
|
||||
uv_mutex_lock(&global.udfsMutex);
|
||||
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
|
||||
if (udfInHash) {
|
||||
++(*udfInHash)->refCount;
|
||||
udf = *udfInHash;
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
} else {
|
||||
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
||||
udfNew->refCount = 1;
|
||||
udfNew->state = UDF_STATE_INIT;
|
||||
|
||||
uv_mutex_init(&udfNew->lock);
|
||||
uv_cond_init(&udfNew->condReady);
|
||||
udf = udfNew;
|
||||
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
}
|
||||
|
||||
uv_mutex_lock(&udf->lock);
|
||||
if (udf->state == UDF_STATE_INIT) {
|
||||
udf->state = UDF_STATE_LOADING;
|
||||
code = udfdLoadUdf(setup->udfName, udf);
|
||||
if (udf->initFunc) {
|
||||
udf->initFunc();
|
||||
}
|
||||
udf->state = UDF_STATE_READY;
|
||||
uv_cond_broadcast(&udf->condReady);
|
||||
uv_mutex_unlock(&udf->lock);
|
||||
} else {
|
||||
while (udf->state != UDF_STATE_READY) {
|
||||
uv_cond_wait(&udf->condReady, &udf->lock);
|
||||
}
|
||||
uv_mutex_unlock(&udf->lock);
|
||||
}
|
||||
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
|
||||
handle->udf = udf;
|
||||
|
||||
SUdfResponse rsp;
|
||||
rsp.seqNum = request->seqNum;
|
||||
rsp.type = request->type;
|
||||
rsp.code = code;
|
||||
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||
rsp.setupRsp.outputType = udf->outputType;
|
||||
rsp.setupRsp.outputLen = udf->outputLen;
|
||||
rsp.setupRsp.bufSize = udf->bufSize;
|
||||
|
||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||
rsp.msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, &rsp);
|
||||
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
return;
|
||||
}
|
||||
|
||||
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
SUdfCallRequest *call = &request->call;
|
||||
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle);
|
||||
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle);
|
||||
SUdf * udf = handle->udf;
|
||||
SUdfResponse response = {0};
|
||||
SUdfResponse * rsp = &response;
|
||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (call->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||
SUdfColumn output = {0};
|
||||
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
code = udf->scalarProcFunc(&input, &output);
|
||||
freeUdfDataDataBlock(&input);
|
||||
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
||||
freeUdfColumn(&output);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_INIT: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
||||
udf->aggStartFunc(&outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_PROC: {
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
||||
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
|
||||
freeUdfInterBuf(&call->interBuf);
|
||||
freeUdfDataDataBlock(&input);
|
||||
subRsp->resultBuf = outBuf;
|
||||
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_FIN: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
||||
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
|
||||
freeUdfInterBuf(&call->interBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
rsp->seqNum = request->seqNum;
|
||||
rsp->type = request->type;
|
||||
rsp->code = code;
|
||||
subRsp->callType = call->callType;
|
||||
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
switch (call->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||
tDeleteSSDataBlock(&call->block);
|
||||
tDeleteSSDataBlock(&subRsp->resultData);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_INIT: {
|
||||
freeUdfInterBuf(&subRsp->resultBuf);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_PROC: {
|
||||
tDeleteSSDataBlock(&call->block);
|
||||
freeUdfInterBuf(&subRsp->resultBuf);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_FIN: {
|
||||
freeUdfInterBuf(&subRsp->resultBuf);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
return;
|
||||
}
|
||||
|
||||
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
SUdfTeardownRequest *teardown = &request->teardown;
|
||||
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
||||
SUdf * udf = handle->udf;
|
||||
bool unloadUdf = false;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
uv_mutex_lock(&global.udfsMutex);
|
||||
udf->refCount--;
|
||||
if (udf->refCount == 0) {
|
||||
unloadUdf = true;
|
||||
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
|
||||
}
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
if (unloadUdf) {
|
||||
uv_cond_destroy(&udf->condReady);
|
||||
uv_mutex_destroy(&udf->lock);
|
||||
if (udf->destroyFunc) {
|
||||
(udf->destroyFunc)();
|
||||
}
|
||||
uv_dlclose(&udf->lib);
|
||||
taosMemoryFree(udf);
|
||||
}
|
||||
taosMemoryFree(handle);
|
||||
|
||||
SUdfResponse response;
|
||||
SUdfResponse *rsp = &response;
|
||||
rsp->seqNum = request->seqNum;
|
||||
rsp->type = request->type;
|
||||
rsp->code = code;
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
return;
|
||||
}
|
||||
|
||||
void udfdOnWrite(uv_write_t *req, int status) {
|
||||
SUvUdfWork *work = (SUvUdfWork *)req->data;
|
||||
if (status < 0) {
|
||||
|
|
Loading…
Reference in New Issue