diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 52cb847b6f..2c7e6216f5 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -109,8 +109,9 @@ int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInter int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); // input: interbuf1, interbuf2 // output: resultBuf -int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, - SUdfInterBuf *resultBuf); +// udf todo: aggmerge +// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, +// SUdfInterBuf *resultBuf); // input: block // output: resultData int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 4efa8764e5..b057194cdb 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -668,8 +668,8 @@ int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) { len += tEncodeDataBlock(buf, &call->block); len += encodeUdfInterBuf(buf, &call->interBuf); } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) { - len += encodeUdfInterBuf(buf, &call->interBuf); - len += encodeUdfInterBuf(buf, &call->interBuf2); + // len += encodeUdfInterBuf(buf, &call->interBuf); + // len += encodeUdfInterBuf(buf, &call->interBuf2); } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) { len += encodeUdfInterBuf(buf, &call->interBuf); } @@ -690,10 +690,10 @@ void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) { buf = tDecodeDataBlock(buf, &call->block); buf = decodeUdfInterBuf(buf, &call->interBuf); break; - case TSDB_UDF_CALL_AGG_MERGE: - buf = decodeUdfInterBuf(buf, &call->interBuf); - buf = decodeUdfInterBuf(buf, &call->interBuf2); - break; + // case TSDB_UDF_CALL_AGG_MERGE: + // buf = decodeUdfInterBuf(buf, &call->interBuf); + // buf = decodeUdfInterBuf(buf, &call->interBuf2); + // break; case TSDB_UDF_CALL_AGG_FIN: buf = decodeUdfInterBuf(buf, &call->interBuf); break; @@ -779,9 +779,9 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) { case TSDB_UDF_CALL_AGG_PROC: len += encodeUdfInterBuf(buf, &callRsp->resultBuf); break; - case TSDB_UDF_CALL_AGG_MERGE: - len += encodeUdfInterBuf(buf, &callRsp->resultBuf); - break; + // case TSDB_UDF_CALL_AGG_MERGE: + // len += encodeUdfInterBuf(buf, &callRsp->resultBuf); + // break; case TSDB_UDF_CALL_AGG_FIN: len += encodeUdfInterBuf(buf, &callRsp->resultBuf); break; @@ -801,9 +801,9 @@ void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) { case TSDB_UDF_CALL_AGG_PROC: buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; - case TSDB_UDF_CALL_AGG_MERGE: - buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); - break; + // case TSDB_UDF_CALL_AGG_MERGE: + // buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); + // break; case TSDB_UDF_CALL_AGG_FIN: buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; @@ -1129,8 +1129,9 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf SSDataBlock *output, SUdfInterBuf *newState); int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); -int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, - SUdfInterBuf *resultBuf); +// udf todo: aggmerge +// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, +// SUdfInterBuf *resultBuf); int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); @@ -2176,11 +2177,11 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf req->interBuf = *state; break; } - case TSDB_UDF_CALL_AGG_MERGE: { - req->interBuf = *state; - req->interBuf2 = *state2; - break; - } + // case TSDB_UDF_CALL_AGG_MERGE: { + // req->interBuf = *state; + // req->interBuf2 = *state2; + // break; + // } case TSDB_UDF_CALL_AGG_FIN: { req->interBuf = *state; break; @@ -2205,10 +2206,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf *newState = rsp->resultBuf; break; } - case TSDB_UDF_CALL_AGG_MERGE: { - *newState = rsp->resultBuf; - break; - } + // case TSDB_UDF_CALL_AGG_MERGE: { + // *newState = rsp->resultBuf; + // break; + // } case TSDB_UDF_CALL_AGG_FIN: { *newState = rsp->resultBuf; break; @@ -2241,12 +2242,13 @@ int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInter // input: interbuf1, interbuf2 // output: resultBuf -int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, - SUdfInterBuf *resultBuf) { - int8_t callType = TSDB_UDF_CALL_AGG_MERGE; - int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); - return err; -} +// udf todo: aggmerge +// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, +// SUdfInterBuf *resultBuf) { +// int8_t callType = TSDB_UDF_CALL_AGG_MERGE; +// int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); +// return err; +// } // input: interBuf // output: resultData diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index e3e4040b3c..ecb24fc77a 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -194,17 +194,17 @@ int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdf } } -int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, - void *udfCtx) { - TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx); - SUdfCPluginCtx *ctx = udfCtx; - if (ctx->aggMergeFunc) { - return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); - } else { - fnError("udfd c plugin aggregation merge not implemented"); - return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; - } -} +// int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, +// void *udfCtx) { +// TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx); +// SUdfCPluginCtx *ctx = udfCtx; +// if (ctx->aggMergeFunc) { +// return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); +// } else { +// fnError("udfd c plugin aggregation merge not implemented"); +// return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; +// } +// } int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) { TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx); @@ -378,7 +378,7 @@ int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc; plugin->udfAggStartFunc = udfdCPluginUdfAggStart; plugin->udfAggProcFunc = udfdCPluginUdfAggProc; - plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; + // plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}}; @@ -889,19 +889,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { break; } - case TSDB_UDF_CALL_AGG_MERGE: { - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - if (outBuf.buf != NULL) { - code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); - freeUdfInterBuf(&call->interBuf); - freeUdfInterBuf(&call->interBuf2); - subRsp->resultBuf = outBuf; - } else { - code = terrno; - } - - break; - } + // case TSDB_UDF_CALL_AGG_MERGE: { + // SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; + // if (outBuf.buf != NULL) { + // code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); + // freeUdfInterBuf(&call->interBuf); + // freeUdfInterBuf(&call->interBuf2); + // subRsp->resultBuf = outBuf; + // } else { + // code = terrno; + // } + // + // break; + // } case TSDB_UDF_CALL_AGG_FIN: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; if (outBuf.buf != NULL) { @@ -959,10 +959,10 @@ _exit: freeUdfInterBuf(&subRsp->resultBuf); break; } - case TSDB_UDF_CALL_AGG_MERGE: { - freeUdfInterBuf(&subRsp->resultBuf); - break; - } + // case TSDB_UDF_CALL_AGG_MERGE: { + // freeUdfInterBuf(&subRsp->resultBuf); + // break; + // } case TSDB_UDF_CALL_AGG_FIN: { freeUdfInterBuf(&subRsp->resultBuf); break;