fix: compute scalar functions before agg in session window
This commit is contained in:
parent
81c83b67d4
commit
c3c8d7b07d
|
@ -30,6 +30,7 @@
|
||||||
typedef struct SSessionAggOperatorInfo {
|
typedef struct SSessionAggOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
SAggSupporter aggSup;
|
SAggSupporter aggSup;
|
||||||
|
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
SWindowRowsSup winSup;
|
SWindowRowsSup winSup;
|
||||||
bool reptScan; // next round scan
|
bool reptScan; // next round scan
|
||||||
|
@ -1407,6 +1408,10 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
|
pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
|
||||||
|
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||||
|
SExprSupp* pExprSup = &pInfo->scalarSupp;
|
||||||
|
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||||
|
}
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
||||||
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
|
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
|
||||||
|
@ -1570,6 +1575,16 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
||||||
pInfo->reptScan = false;
|
pInfo->reptScan = false;
|
||||||
pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
|
pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
|
||||||
pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
|
pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
|
||||||
|
|
||||||
|
if (pSessionNode->window.pExprs != NULL) {
|
||||||
|
int32_t numOfScalar = 0;
|
||||||
|
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
|
||||||
|
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -1064,9 +1064,8 @@ static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets,
|
||||||
SNode* pTarget = NULL;
|
SNode* pTarget = NULL;
|
||||||
bool found = false;
|
bool found = false;
|
||||||
FOREACH(pTarget, pTargets) {
|
FOREACH(pTarget, pTargets) {
|
||||||
if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget))
|
if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) ||
|
||||||
// || (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))
|
(0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
|
||||||
) {
|
|
||||||
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
|
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue