diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8dfd03622f..8b50465b5f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -694,6 +694,8 @@ typedef struct { // 3.0.5. int64_t checkpointId; + + int32_t indexForMultiAggBalance; char reserve[256]; } SStreamObj; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 47461d7c1b..80f77bc9a5 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -181,20 +181,39 @@ int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen); } -// todo random choose a node to do compute -SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { +// random choose a node to do compute +SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) { + SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb); + if (pDbObj == NULL) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return NULL; + } + + if(pStream->indexForMultiAggBalance == -1){ + taosSeedRand(taosSafeRand()); + pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups; + } + + int32_t index = 0; void* pIter = NULL; SVgObj* pVgroup = NULL; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != dbUid) { + if (pVgroup->dbUid != pStream->sourceDbUid) { sdbRelease(pMnode->pSdb, pVgroup); continue; } - sdbCancelFetch(pMnode->pSdb, pIter); - return pVgroup; + if (index++ == pStream->indexForMultiAggBalance){ + pStream->indexForMultiAggBalance++; + pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups; + sdbCancelFetch(pMnode->pSdb, pIter); + break; + } + sdbRelease(pMnode->pSdb, pVgroup); } + sdbRelease(pMnode->pSdb, pDbObj); + return pVgroup; } @@ -440,10 +459,10 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S if (tsDeployOnSnode) { pSnode = mndSchedFetchOneSnode(pMnode); if (pSnode == NULL) { - pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); + pVgroup = mndSchedFetchOneVg(pMnode, pStream); } } else { - pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); + pVgroup = mndSchedFetchOneVg(pMnode, pStream); } code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 44842084c5..34ea7109d0 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -566,6 +566,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE; streamObj.conf.triggerParam = pCreate->maxDelay; streamObj.ast = taosStrdup(smaObj.ast); + streamObj.indexForMultiAggBalance = -1; // check the maxDelay if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f8fba143f8..50450f0cf2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -363,6 +363,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->updateTime = pObj->createTime; pObj->version = 1; pObj->smaId = 0; + pObj->indexForMultiAggBalance = -1; pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name)); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index ef131b4cb3..cae500d7b9 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -768,51 +768,6 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo return code; } -//static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) { -// SLogicNode* pPartWindow = NULL; -// int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); -// if (TSDB_CODE_SUCCESS == code) { -// ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI; -// ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL; -// code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); -// } -// if (TSDB_CODE_SUCCESS == code) { -// code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, -// (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT)); -// } -// pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; -// ++(pCxt->groupId); -// return code; -//} - -static bool isStreamMultiAgg(SLogicNode* pNode) { - if(LIST_LENGTH(pNode->pChildren) <= 0) return false; - - SNode* pChild = nodesListGetNode(pNode->pChildren, 0); - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) { - qDebug("vgroups:%d", ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups); - return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > tsStreamAggCnt; - } - return false; -} - -static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - SLogicNode* pPartWindow = NULL; - int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); - if (TSDB_CODE_SUCCESS == code) { - ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI; - ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL; - code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, - (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT)); - } - pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; - ++(pCxt->groupId); - return code; -} - static int32_t stbSplSplitIntervalForStreamMultiAgg(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartWindow = NULL; SLogicNode* pMidWindow = NULL; @@ -845,11 +800,7 @@ static int32_t stbSplSplitIntervalForStreamMultiAgg(SSplitContext* pCxt, SStable static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) { if (pCxt->pPlanCxt->streamQuery) { -// if(isStreamMultiAgg(pInfo->pSplitNode)){ - return stbSplSplitIntervalForStreamMultiAgg(pCxt, pInfo); -// }else{ -// return stbSplSplitIntervalForStream(pCxt, pInfo); -// } + return stbSplSplitIntervalForStreamMultiAgg(pCxt, pInfo); } else { return stbSplSplitIntervalForBatch(pCxt, pInfo); } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 8edfb352ab..0c66d0d1ed 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -9,6 +9,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f empty.py #system test +,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_multi_agg.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py new file mode 100644 index 0000000000..5c478d7e14 --- /dev/null +++ b/tests/system-test/8-stream/stream_multi_agg.py @@ -0,0 +1,98 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * +from util.autogen import * + +import random +import time +import traceback +import os +from os import path + + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'streamAggCnt': 2} + # init + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + + def case1(self): + tdLog.debug("========case1 start========") + + os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 12 > /dev/null 2>&1 &") + time.sleep(4) + tdSql.query("use test") + tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") + tdLog.debug("========create stream useing snode and insert data ok========") + time.sleep(15) + + tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") + rowCnt = tdSql.getRows() + results = [] + for i in range(rowCnt): + results.append(tdSql.getData(i,1)) + + tdSql.query("select * from st1 order by groupid,_wstart") + tdSql.checkRows(rowCnt) + for i in range(rowCnt): + data1 = tdSql.getData(i,1) + data2 = results[i] + if data1 != data2: + tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2)) + tdLog.exit("check data error!") + + tdLog.debug("case1 end") + + def case2(self): + tdLog.debug("========case2 start========") + + os.system("taosBenchmark -d db -t 20 -v 12 -n 1000 -y > /dev/null 2>&1") + # create stream + tdSql.execute("use db") + tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) + sql = "select count(*) from sta" + # loop wait max 60s to check count is ok + tdLog.info("loop wait result ...") + tdSql.checkDataLoop(0, 0, 99, sql, loopCount=120, waitTime=0.5) + + # check all data is correct + sql = "select * from sta where cnt != 200;" + tdSql.query(sql) + tdSql.checkRows(0) + + # check ts interval is correct + sql = "select * from ( select diff(_wstart) as tsdif from sta ) where tsdif != 10;" + tdSql.query(sql) + tdSql.checkRows(0) + tdLog.debug("case2 end") + +# run + def run(self): + self.case1() + self.case2() + + # stop + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file