From 0d8dcc2972a38508cd00b4506ddc109cd024efff Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 Mar 2021 09:51:53 +0800 Subject: [PATCH] [td-3188] --- src/client/src/tscLocalMerge.c | 1 - src/query/src/qExecutor.c | 35 ++++++++++++++++++++++++++++------ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 6217beff44..779240dac0 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1073,7 +1073,6 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n } pCtx->currentStage = MERGE_STAGE; - if (needInit) { aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo); } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9f8fe0bcb2..59a2773e87 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -782,7 +782,13 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx } if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunction(&pCtx[k]); +// aAggs[functionId].xFunction(&pCtx[k]); + if (functionId < 0) { + // load the script and exec, pRuntimeEnv->pUdfInfo + + } else { + aAggs[functionId].xFunction(&pCtx[k]); + } } // restore it @@ -1009,7 +1015,13 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction int32_t functionId = pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { pCtx[k].startTs = startTs;// this can be set during create the struct - aAggs[functionId].xFunction(&pCtx[k]); +// aAggs[functionId].xFunction(&pCtx[k]); + if (functionId < 0) { + // load the script and exec, pRuntimeEnv->pUdfInfo + + } else { + aAggs[functionId].xFunction(&pCtx[k]); + } } } } @@ -1019,7 +1031,13 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].startTs = pQuery->window.skey; - aAggs[pCtx[k].functionId].xFunction(&pCtx[k]); + + if (pCtx[k].functionId < 0) { + // load the script and exec, pRuntimeEnv->pUdfInfo + + } else { + aAggs[pCtx[k].functionId].xFunction(&pCtx[k]); + } } } @@ -2480,9 +2498,14 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId; // group by + first/last should not apply the first/last block filter - status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); - if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + if (functionId < 0) { + status |= BLK_DATA_ALL_NEEDED; return status; + } else { + status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); + if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + return status; + } } } @@ -3060,7 +3083,7 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { continue; } - if (pCtx[j].functionId < 0) { // udf initialize + if (pCtx[j].functionId < 0) { // todo udf initialization } else { aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);