[td-3188]
This commit is contained in:
parent
18cb7b36fc
commit
0d8dcc2972
|
@ -1073,7 +1073,6 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n
|
||||||
}
|
}
|
||||||
|
|
||||||
pCtx->currentStage = MERGE_STAGE;
|
pCtx->currentStage = MERGE_STAGE;
|
||||||
|
|
||||||
if (needInit) {
|
if (needInit) {
|
||||||
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
|
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -782,7 +782,13 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
|
||||||
}
|
}
|
||||||
|
|
||||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||||
aAggs[functionId].xFunction(&pCtx[k]);
|
// aAggs[functionId].xFunction(&pCtx[k]);
|
||||||
|
if (functionId < 0) {
|
||||||
|
// load the script and exec, pRuntimeEnv->pUdfInfo
|
||||||
|
|
||||||
|
} else {
|
||||||
|
aAggs[functionId].xFunction(&pCtx[k]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore it
|
// restore it
|
||||||
|
@ -1009,7 +1015,13 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
|
||||||
int32_t functionId = pCtx[k].functionId;
|
int32_t functionId = pCtx[k].functionId;
|
||||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||||
pCtx[k].startTs = startTs;// this can be set during create the struct
|
pCtx[k].startTs = startTs;// this can be set during create the struct
|
||||||
aAggs[functionId].xFunction(&pCtx[k]);
|
// aAggs[functionId].xFunction(&pCtx[k]);
|
||||||
|
if (functionId < 0) {
|
||||||
|
// load the script and exec, pRuntimeEnv->pUdfInfo
|
||||||
|
|
||||||
|
} else {
|
||||||
|
aAggs[functionId].xFunction(&pCtx[k]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1019,7 +1031,13 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
|
||||||
|
|
||||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
pCtx[k].startTs = pQuery->window.skey;
|
pCtx[k].startTs = pQuery->window.skey;
|
||||||
aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
|
|
||||||
|
if (pCtx[k].functionId < 0) {
|
||||||
|
// load the script and exec, pRuntimeEnv->pUdfInfo
|
||||||
|
|
||||||
|
} else {
|
||||||
|
aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2480,9 +2498,14 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
|
||||||
int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId;
|
int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId;
|
||||||
|
|
||||||
// group by + first/last should not apply the first/last block filter
|
// group by + first/last should not apply the first/last block filter
|
||||||
status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
|
if (functionId < 0) {
|
||||||
if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
|
status |= BLK_DATA_ALL_NEEDED;
|
||||||
return status;
|
return status;
|
||||||
|
} else {
|
||||||
|
status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
|
||||||
|
if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3060,7 +3083,7 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCtx[j].functionId < 0) { // udf initialize
|
if (pCtx[j].functionId < 0) { // todo udf initialization
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
|
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
|
||||||
|
|
Loading…
Reference in New Issue