feat: support udf merge in udfd

This commit is contained in:
slzhou 2022-08-26 14:27:15 +08:00
parent 08b13cddcf
commit f3890f7421
3 changed files with 24 additions and 5 deletions

View File

@ -256,8 +256,9 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentR
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf); typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -1706,8 +1706,8 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
} }
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) { void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) {
SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock)); SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*));
taosArrayPush(dataBlocks, pBlock); taosArrayPush(dataBlocks, &pBlock);
blockDebugShowDataBlocks(dataBlocks, flag); blockDebugShowDataBlocks(dataBlocks, flag);
taosArrayDestroy(dataBlocks); taosArrayDestroy(dataBlocks);
} }

View File

@ -84,6 +84,7 @@ typedef struct SUdf {
TUdfAggStartFunc aggStartFunc; TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc; TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc; TUdfAggFinishFunc aggFinishFunc;
TUdfAggMergeFunc aggMergeFunc;
TUdfInitFunc initFunc; TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc; TUdfDestroyFunc destroyFunc;
@ -271,6 +272,15 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break; break;
} }
case TSDB_UDF_CALL_AGG_MERGE: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf);
freeUdfInterBuf(&call->interBuf);
freeUdfInterBuf(&call->interBuf2);
subRsp->resultBuf = outBuf;
break;
}
case TSDB_UDF_CALL_AGG_FIN: { case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggFinishFunc(&call->interBuf, &outBuf); code = udf->aggFinishFunc(&call->interBuf, &outBuf);
@ -309,6 +319,10 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
freeUdfInterBuf(&subRsp->resultBuf); freeUdfInterBuf(&subRsp->resultBuf);
break; break;
} }
case TSDB_UDF_CALL_AGG_MERGE: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_FIN: { case TSDB_UDF_CALL_AGG_FIN: {
freeUdfInterBuf(&subRsp->resultBuf); freeUdfInterBuf(&subRsp->resultBuf);
break; break;
@ -560,7 +574,11 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strncpy(finishFuncName, processFuncName, strlen(processFuncName)); strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
// TODO: merge char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc));
} }
return 0; return 0;
} }