diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 5e84b87a81..2b2063e3f6 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -256,8 +256,9 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentR typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); -typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf); -typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); +typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf); +typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf); +typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData); #ifdef __cplusplus } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c65e966046..c748260918 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1706,8 +1706,8 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { } void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) { - SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock)); - taosArrayPush(dataBlocks, pBlock); + SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*)); + taosArrayPush(dataBlocks, &pBlock); blockDebugShowDataBlocks(dataBlocks, flag); taosArrayDestroy(dataBlocks); } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 1cbc78df48..5b27e030b9 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -84,6 +84,7 @@ typedef struct SUdf { TUdfAggStartFunc aggStartFunc; TUdfAggProcessFunc aggProcFunc; TUdfAggFinishFunc aggFinishFunc; + TUdfAggMergeFunc aggMergeFunc; TUdfInitFunc initFunc; TUdfDestroyFunc destroyFunc; @@ -271,6 +272,15 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { break; } + case TSDB_UDF_CALL_AGG_MERGE: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; + code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf); + freeUdfInterBuf(&call->interBuf); + freeUdfInterBuf(&call->interBuf2); + subRsp->resultBuf = outBuf; + + break; + } case TSDB_UDF_CALL_AGG_FIN: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; code = udf->aggFinishFunc(&call->interBuf, &outBuf); @@ -309,6 +319,10 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { freeUdfInterBuf(&subRsp->resultBuf); break; } + case TSDB_UDF_CALL_AGG_MERGE: { + freeUdfInterBuf(&subRsp->resultBuf); + break; + } case TSDB_UDF_CALL_AGG_FIN: { freeUdfInterBuf(&subRsp->resultBuf); break; @@ -560,7 +574,11 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { strncpy(finishFuncName, processFuncName, strlen(processFuncName)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); - // TODO: merge + char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *mergeSuffix = "_merge"; + strncpy(finishFuncName, processFuncName, strlen(processFuncName)); + strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix)); + uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc)); } return 0; }