enhance: udf code refactoring

This commit is contained in:
shenglian zhou 2022-05-22 16:03:46 +08:00
parent 3fe92062f1
commit 548838d9c3
1 changed files with 350 additions and 274 deletions

View File

@ -46,6 +46,13 @@ typedef struct SUdfdData {
SUdfdData udfdGlobal = {0};
static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal);
static int32_t udfSpawnUdfd(SUdfdData* pData);
static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg);
static void udfUdfdStopAsyncCb(uv_async_t *async);
static void udfWatchUdfd(void *args);
int32_t udfStartUdfd(int32_t startDnodeId);
int32_t udfStopUdfd();
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
@ -413,6 +420,34 @@ enum {
UDFC_STATE_STOPPING, // stopping after udfcClose
};
int32_t getUdfdPipeName(char* pipeName, int32_t size);
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request);
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state);
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state);
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call);
void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call);
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown);
void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown);
int32_t encodeUdfRequest(void** buf, const SUdfRequest* request);
void* decodeUdfRequest(const void* buf, SUdfRequest* request);
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp);
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp);
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp);
void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp);
int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp);
void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse);
int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp);
void* decodeUdfResponse(const void* buf, SUdfResponse* rsp);
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
void freeUdfColumn(SUdfColumn* col);
void freeUdfDataDataBlock(SUdfDataBlock *block);
void freeUdfInterBuf(SUdfInterBuf *buf);
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);
int32_t getUdfdPipeName(char* pipeName, int32_t size) {
char dnodeId[8] = {0};
size_t dnodeIdSize = sizeof(dnodeId);
@ -650,7 +685,7 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
break;
default:
//TODO: log error
fnError("encode udf response, invalid udf response type %d", rsp->type);
break;
}
return len;
@ -676,7 +711,7 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
break;
default:
//TODO: log error
fnError("decode udf response, invalid udf response type %d", rsp->type);
break;
}
return (void*)buf;
@ -817,6 +852,319 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
return 0;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
int8_t finalResNum;
int8_t interResNum;
char* finalResBuf;
char* interResBuf;
} SUdfAggRes;
void onUdfcPipeClose(uv_handle_t *handle);
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf);
void udfcUvHandleRsp(SClientUvConn *conn);
void udfcUvHandleError(SClientUvConn *conn);
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
void onUdfcPipetWrite(uv_write_t *write, int status);
void onUdfcPipeConnect(uv_connect_t *connect, int status);
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask);
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
void udfcAsyncTaskCb(uv_async_t *async);
void cleanUpUvTasks(SUdfcProxy *udfc);
void udfStopAsyncCb(uv_async_t *async);
void constructUdfService(void *argsThread);
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
int compareUdfcFuncSub(const void* elem1, const void* elem2);
int32_t doTeardownUdf(UdfcFuncHandle handle);
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState);
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
int32_t udfcOpen();
int32_t udfcClose();
int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle);
void releaseUdfFuncHandle(char* udfName);
int32_t cleanUpUdfs();
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
return strcmp(stub1->udfName, stub2->udfName);
}
int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (stubIndex != -1) {
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
UdfcFuncHandle handle = foundStub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
*pHandle = foundStub->handle;
++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
} else {
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
udfName, foundStub->refCount, foundStub->lastRefTime);
taosArrayRemove(gUdfdProxy.udfStubs, stubIndex);
}
}
*pHandle = NULL;
code = doSetupUdf(udfName, pHandle);
if (code == TSDB_CODE_SUCCESS) {
SUdfcFuncStub stub = {0};
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
*pHandle = NULL;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return code;
}
void releaseUdfFuncHandle(char* udfName) {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub) {
return;
}
if (foundStub->refCount > 0) {
--foundStub->refCount;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}
int32_t cleanUpUdfs() {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0;
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub);
} else {
fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
stub->udfName, stub->refCount, stub->lastRefTime);
}
}
++i;
}
taosArrayDestroy(gUdfdProxy.udfStubs);
gUdfdProxy.udfStubs = udfStubs;
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
}
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
UdfcFuncHandle handle = NULL;
int32_t code = acquireUdfFuncHandle(udfName, &handle);
if (code != 0) {
return code;
}
SUdfcUvSession *session = handle;
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (output->columnData == NULL) {
fnError("udfc scalar function calculate error. no column data");
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
} else {
if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType,
session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
releaseUdfFuncHandle(udfName);
return code;
}
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
if (fmIsScalarFunc(pFunc->funcId)) {
return false;
}
pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
return true;
}
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
if (functionSetup(pCtx, pResultCellInfo) != true) {
return false;
}
UdfcFuncHandle handle;
int32_t udfCode = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
return false;
}
SUdfcUvSession *session = (SUdfcUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
memset(udfRes, 0, envSize);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
udfRes->interResNum = buf.numOfResult;
if (buf.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
releaseUdfFuncHandle(pCtx->udfName);
freeUdfInterBuf(&buf);
return true;
}
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t udfCode = 0;
UdfcFuncHandle handle = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
return udfCode;
}
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SInputColumnInfoData* pInput = &pCtx->input;
int32_t numOfCols = pInput->numOfInputCols;
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
SSDataBlock tempBlock = {0};
tempBlock.info.numOfCols = numOfCols;
tempBlock.info.rows = pInput->totalRows;
tempBlock.info.uid = pInput->uid;
bool hasVarCol = false;
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *col = pInput->pData[i];
if (IS_VAR_DATA_TYPE(col->info.type)) {
hasVarCol = true;
}
taosArrayPush(tempBlock.pDataBlock, col);
}
tempBlock.info.hasVarCol = hasVarCol;
SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
if (udfCode != 0) {
fnError("udfAggProcess error. code: %d", udfCode);
newState.numOfResult = 0;
} else {
udfRes->interResNum = newState.numOfResult;
if (newState.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
}
}
if (newState.numOfResult == 1 || state.numOfResult == 1) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock);
releaseUdfFuncHandle(pCtx->udfName);
freeUdfInterBuf(&newState);
return udfCode;
}
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
int32_t udfCode = 0;
UdfcFuncHandle handle = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
return udfCode;
}
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf resultBuf = {0};
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
int32_t udfCallCode= 0;
udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode != 0) {
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
GET_RES_INFO(pCtx)->numOfRes = 0;
} else {
if (resultBuf.bufLen <= session->outputLen) {
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
udfRes->finalResNum = resultBuf.numOfResult;
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
} else {
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen);
GET_RES_INFO(pCtx)->numOfRes = 0;
udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
freeUdfInterBuf(&resultBuf);
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName);
return udfCallCode == 0 ? numOfResults : udfCallCode;
}
void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data;
if (!QUEUE_EMPTY(&conn->taskQueue)) {
@ -1314,93 +1662,6 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
return err;
}
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
return strcmp(stub1->udfName, stub2->udfName);
}
int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (stubIndex != -1) {
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
UdfcFuncHandle handle = foundStub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
*pHandle = foundStub->handle;
++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
} else {
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
udfName, foundStub->refCount, foundStub->lastRefTime);
taosArrayRemove(gUdfdProxy.udfStubs, stubIndex);
}
}
*pHandle = NULL;
code = doSetupUdf(udfName, pHandle);
if (code == TSDB_CODE_SUCCESS) {
SUdfcFuncStub stub = {0};
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
*pHandle = NULL;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return code;
}
void releaseUdfFuncHandle(char* udfName) {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub) {
return;
}
if (foundStub->refCount > 0) {
--foundStub->refCount;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}
int32_t cleanUpUdfs() {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0;
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub);
} else {
fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
stub->udfName, stub->refCount, stub->lastRefTime);
}
}
++i;
}
taosArrayDestroy(gUdfdProxy.udfStubs);
gUdfdProxy.udfStubs = udfStubs;
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
}
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) {
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
@ -1524,29 +1785,6 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
return err;
}
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
UdfcFuncHandle handle = NULL;
int32_t code = acquireUdfFuncHandle(udfName, &handle);
if (code != 0) {
return code;
}
SUdfcUvSession *session = handle;
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (output->columnData == NULL) {
fnError("udfc scalar function calculate error. no column data");
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
} else {
if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType,
session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
releaseUdfFuncHandle(udfName);
return code;
}
int32_t doTeardownUdf(UdfcFuncHandle handle) {
SUdfcUvSession *session = (SUdfcUvSession *) handle;
@ -1576,165 +1814,3 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
return err;
}
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
int8_t finalResNum;
int8_t interResNum;
char* finalResBuf;
char* interResBuf;
} SUdfAggRes;
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
if (fmIsScalarFunc(pFunc->funcId)) {
return false;
}
pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
return true;
}
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
if (functionSetup(pCtx, pResultCellInfo) != true) {
return false;
}
UdfcFuncHandle handle;
int32_t udfCode = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
return false;
}
SUdfcUvSession *session = (SUdfcUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
memset(udfRes, 0, envSize);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
udfRes->interResNum = buf.numOfResult;
if (buf.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
releaseUdfFuncHandle(pCtx->udfName);
freeUdfInterBuf(&buf);
return true;
}
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t udfCode = 0;
UdfcFuncHandle handle = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
return udfCode;
}
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SInputColumnInfoData* pInput = &pCtx->input;
int32_t numOfCols = pInput->numOfInputCols;
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
SSDataBlock tempBlock = {0};
tempBlock.info.numOfCols = numOfCols;
tempBlock.info.rows = pInput->totalRows;
tempBlock.info.uid = pInput->uid;
bool hasVarCol = false;
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *col = pInput->pData[i];
if (IS_VAR_DATA_TYPE(col->info.type)) {
hasVarCol = true;
}
taosArrayPush(tempBlock.pDataBlock, col);
}
tempBlock.info.hasVarCol = hasVarCol;
SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
if (udfCode != 0) {
fnError("udfAggProcess error. code: %d", udfCode);
newState.numOfResult = 0;
} else {
udfRes->interResNum = newState.numOfResult;
if (newState.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
}
}
if (newState.numOfResult == 1 || state.numOfResult == 1) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock);
releaseUdfFuncHandle(pCtx->udfName);
freeUdfInterBuf(&newState);
return udfCode;
}
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
int32_t udfCode = 0;
UdfcFuncHandle handle = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
return udfCode;
}
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf resultBuf = {0};
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
int32_t udfCallCode= 0;
udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode != 0) {
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
GET_RES_INFO(pCtx)->numOfRes = 0;
} else {
if (resultBuf.bufLen <= session->outputLen) {
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
udfRes->finalResNum = resultBuf.numOfResult;
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
} else {
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen);
GET_RES_INFO(pCtx)->numOfRes = 0;
udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
freeUdfInterBuf(&resultBuf);
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName);
return udfCallCode == 0 ? numOfResults : udfCallCode;
}