udf merge: temporarily block code

This commit is contained in:
factosea 2024-12-19 15:43:24 +08:00
parent 50dbfdd4a3
commit 1cb0e2fec2
3 changed files with 63 additions and 60 deletions

View File

@ -109,8 +109,9 @@ int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInter
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
// input: interbuf1, interbuf2
// output: resultBuf
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
SUdfInterBuf *resultBuf);
// udf todo: aggmerge
// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
// SUdfInterBuf *resultBuf);
// input: block
// output: resultData
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);

View File

@ -668,8 +668,8 @@ int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
len += tEncodeDataBlock(buf, &call->block);
len += encodeUdfInterBuf(buf, &call->interBuf);
} else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
len += encodeUdfInterBuf(buf, &call->interBuf);
len += encodeUdfInterBuf(buf, &call->interBuf2);
// len += encodeUdfInterBuf(buf, &call->interBuf);
// len += encodeUdfInterBuf(buf, &call->interBuf2);
} else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
len += encodeUdfInterBuf(buf, &call->interBuf);
}
@ -690,10 +690,10 @@ void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) {
buf = tDecodeDataBlock(buf, &call->block);
buf = decodeUdfInterBuf(buf, &call->interBuf);
break;
case TSDB_UDF_CALL_AGG_MERGE:
buf = decodeUdfInterBuf(buf, &call->interBuf);
buf = decodeUdfInterBuf(buf, &call->interBuf2);
break;
// case TSDB_UDF_CALL_AGG_MERGE:
// buf = decodeUdfInterBuf(buf, &call->interBuf);
// buf = decodeUdfInterBuf(buf, &call->interBuf2);
// break;
case TSDB_UDF_CALL_AGG_FIN:
buf = decodeUdfInterBuf(buf, &call->interBuf);
break;
@ -779,9 +779,9 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
case TSDB_UDF_CALL_AGG_PROC:
len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_MERGE:
len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
// case TSDB_UDF_CALL_AGG_MERGE:
// len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
// break;
case TSDB_UDF_CALL_AGG_FIN:
len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
@ -801,9 +801,9 @@ void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) {
case TSDB_UDF_CALL_AGG_PROC:
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_MERGE:
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
// case TSDB_UDF_CALL_AGG_MERGE:
// buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
// break;
case TSDB_UDF_CALL_AGG_FIN:
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
@ -1129,8 +1129,9 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
SSDataBlock *output, SUdfInterBuf *newState);
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
SUdfInterBuf *resultBuf);
// udf todo: aggmerge
// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
// SUdfInterBuf *resultBuf);
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
@ -2176,11 +2177,11 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
req->interBuf = *state;
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
req->interBuf = *state;
req->interBuf2 = *state2;
break;
}
// case TSDB_UDF_CALL_AGG_MERGE: {
// req->interBuf = *state;
// req->interBuf2 = *state2;
// break;
// }
case TSDB_UDF_CALL_AGG_FIN: {
req->interBuf = *state;
break;
@ -2205,10 +2206,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
*newState = rsp->resultBuf;
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
*newState = rsp->resultBuf;
break;
}
// case TSDB_UDF_CALL_AGG_MERGE: {
// *newState = rsp->resultBuf;
// break;
// }
case TSDB_UDF_CALL_AGG_FIN: {
*newState = rsp->resultBuf;
break;
@ -2241,12 +2242,13 @@ int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInter
// input: interbuf1, interbuf2
// output: resultBuf
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
SUdfInterBuf *resultBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
return err;
}
// udf todo: aggmerge
// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
// SUdfInterBuf *resultBuf) {
// int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
// int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
// return err;
// }
// input: interBuf
// output: resultData

View File

@ -194,17 +194,17 @@ int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdf
}
}
int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
void *udfCtx) {
TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx);
SUdfCPluginCtx *ctx = udfCtx;
if (ctx->aggMergeFunc) {
return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
} else {
fnError("udfd c plugin aggregation merge not implemented");
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
}
}
// int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
// void *udfCtx) {
// TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx);
// SUdfCPluginCtx *ctx = udfCtx;
// if (ctx->aggMergeFunc) {
// return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
// } else {
// fnError("udfd c plugin aggregation merge not implemented");
// return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
// }
// }
int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) {
TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx);
@ -378,7 +378,7 @@ int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc;
plugin->udfAggStartFunc = udfdCPluginUdfAggStart;
plugin->udfAggProcFunc = udfdCPluginUdfAggProc;
plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge;
// plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge;
plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish;
SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}};
@ -889,19 +889,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
if (outBuf.buf != NULL) {
code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
freeUdfInterBuf(&call->interBuf);
freeUdfInterBuf(&call->interBuf2);
subRsp->resultBuf = outBuf;
} else {
code = terrno;
}
break;
}
// case TSDB_UDF_CALL_AGG_MERGE: {
// SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
// if (outBuf.buf != NULL) {
// code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
// freeUdfInterBuf(&call->interBuf);
// freeUdfInterBuf(&call->interBuf2);
// subRsp->resultBuf = outBuf;
// } else {
// code = terrno;
// }
//
// break;
// }
case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
if (outBuf.buf != NULL) {
@ -959,10 +959,10 @@ _exit:
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
// case TSDB_UDF_CALL_AGG_MERGE: {
// freeUdfInterBuf(&subRsp->resultBuf);
// break;
// }
case TSDB_UDF_CALL_AGG_FIN: {
freeUdfInterBuf(&subRsp->resultBuf);
break;