Merge pull request #24571 from taosdata/szhou/fix/td-28300-2
fix: compute scalar functions before agg in session window
This commit is contained in:
commit
799ff48e68
|
@ -30,6 +30,7 @@
|
||||||
typedef struct SSessionAggOperatorInfo {
|
typedef struct SSessionAggOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
SAggSupporter aggSup;
|
SAggSupporter aggSup;
|
||||||
|
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
SWindowRowsSup winSup;
|
SWindowRowsSup winSup;
|
||||||
bool reptScan; // next round scan
|
bool reptScan; // next round scan
|
||||||
|
@ -1407,6 +1408,10 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
|
pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
|
||||||
|
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||||
|
SExprSupp* pExprSup = &pInfo->scalarSupp;
|
||||||
|
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||||
|
}
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
||||||
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
|
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
|
||||||
|
@ -1531,6 +1536,8 @@ void destroySWindowOperatorInfo(void* param) {
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
|
||||||
cleanupAggSup(&pInfo->aggSup);
|
cleanupAggSup(&pInfo->aggSup);
|
||||||
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
|
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
@ -1570,6 +1577,16 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
||||||
pInfo->reptScan = false;
|
pInfo->reptScan = false;
|
||||||
pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
|
pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
|
||||||
pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
|
pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
|
||||||
|
|
||||||
|
if (pSessionNode->window.pExprs != NULL) {
|
||||||
|
int32_t numOfScalar = 0;
|
||||||
|
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
|
||||||
|
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -1064,9 +1064,8 @@ static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets,
|
||||||
SNode* pTarget = NULL;
|
SNode* pTarget = NULL;
|
||||||
bool found = false;
|
bool found = false;
|
||||||
FOREACH(pTarget, pTargets) {
|
FOREACH(pTarget, pTargets) {
|
||||||
if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget))
|
if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) ||
|
||||||
// || (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))
|
(0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
|
||||||
) {
|
|
||||||
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
|
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -45,6 +45,7 @@ fi
|
||||||
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4
|
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4
|
||||||
,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
|
,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
|
||||||
|
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_expr.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/project_group.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/project_group.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
from wsgiref.headers import tspecials
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
|
||||||
|
self.rowNum = 10
|
||||||
|
self.batchNum = 5
|
||||||
|
self.ts = 1537146000000
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
dbname = "db"
|
||||||
|
tdSql.prepare()
|
||||||
|
|
||||||
|
tdSql.execute(f'''create table sta(ts timestamp, f int, col2 bigint) tags(tg1 int, tg2 binary(20))''')
|
||||||
|
tdSql.execute(f"create table sta1 using sta tags(1, 'a')")
|
||||||
|
tdSql.execute(f"insert into sta1 values(1537146000001, 11, 110)")
|
||||||
|
tdSql.execute(f"insert into sta1 values(1537146000002, 12, 120)")
|
||||||
|
tdSql.execute(f"insert into sta1 values(1537146000003, 13, 130)")
|
||||||
|
|
||||||
|
tdSql.query("select _wstart, f+100, count(*) from db.sta partition by f+100 session(ts, 1a) order by _wstart");
|
||||||
|
tdSql.checkData(0, 1, 111.0)
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue