fix:remove useless code in split logic plan & random choose vnode in stream task & add test case
This commit is contained in:
parent
b0c71f36b1
commit
78c8ebae7f
|
@ -694,6 +694,8 @@ typedef struct {
|
||||||
|
|
||||||
// 3.0.5.
|
// 3.0.5.
|
||||||
int64_t checkpointId;
|
int64_t checkpointId;
|
||||||
|
|
||||||
|
int32_t indexForMultiAggBalance;
|
||||||
char reserve[256];
|
char reserve[256];
|
||||||
|
|
||||||
} SStreamObj;
|
} SStreamObj;
|
||||||
|
|
|
@ -181,20 +181,39 @@ int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan*
|
||||||
return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
|
return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo random choose a node to do compute
|
// random choose a node to do compute
|
||||||
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
|
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
|
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
|
||||||
|
if (pDbObj == NULL) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(pStream->indexForMultiAggBalance == -1){
|
||||||
|
taosSeedRand(taosSafeRand());
|
||||||
|
pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t index = 0;
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
SVgObj* pVgroup = NULL;
|
SVgObj* pVgroup = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
if (pVgroup->dbUid != dbUid) {
|
if (pVgroup->dbUid != pStream->sourceDbUid) {
|
||||||
sdbRelease(pMnode->pSdb, pVgroup);
|
sdbRelease(pMnode->pSdb, pVgroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
if (index++ == pStream->indexForMultiAggBalance){
|
||||||
return pVgroup;
|
pStream->indexForMultiAggBalance++;
|
||||||
|
pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
sdbRelease(pMnode->pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
sdbRelease(pMnode->pSdb, pDbObj);
|
||||||
|
|
||||||
return pVgroup;
|
return pVgroup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -440,10 +459,10 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S
|
||||||
if (tsDeployOnSnode) {
|
if (tsDeployOnSnode) {
|
||||||
pSnode = mndSchedFetchOneSnode(pMnode);
|
pSnode = mndSchedFetchOneSnode(pMnode);
|
||||||
if (pSnode == NULL) {
|
if (pSnode == NULL) {
|
||||||
pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
pVgroup = mndSchedFetchOneVg(pMnode, pStream);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
pVgroup = mndSchedFetchOneVg(pMnode, pStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false);
|
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false);
|
||||||
|
|
|
@ -566,6 +566,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
|
streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
|
||||||
streamObj.conf.triggerParam = pCreate->maxDelay;
|
streamObj.conf.triggerParam = pCreate->maxDelay;
|
||||||
streamObj.ast = taosStrdup(smaObj.ast);
|
streamObj.ast = taosStrdup(smaObj.ast);
|
||||||
|
streamObj.indexForMultiAggBalance = -1;
|
||||||
|
|
||||||
// check the maxDelay
|
// check the maxDelay
|
||||||
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
|
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
|
||||||
|
|
|
@ -363,6 +363,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
pObj->updateTime = pObj->createTime;
|
pObj->updateTime = pObj->createTime;
|
||||||
pObj->version = 1;
|
pObj->version = 1;
|
||||||
pObj->smaId = 0;
|
pObj->smaId = 0;
|
||||||
|
pObj->indexForMultiAggBalance = -1;
|
||||||
|
|
||||||
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
|
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
|
||||||
|
|
||||||
|
|
|
@ -768,51 +768,6 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
//static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
|
||||||
// SLogicNode* pPartWindow = NULL;
|
|
||||||
// int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
|
|
||||||
// if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
// ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
|
|
||||||
// ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
|
|
||||||
// code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
|
|
||||||
// }
|
|
||||||
// if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
// code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
|
||||||
// (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
|
|
||||||
// }
|
|
||||||
// pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
|
||||||
// ++(pCxt->groupId);
|
|
||||||
// return code;
|
|
||||||
//}
|
|
||||||
|
|
||||||
static bool isStreamMultiAgg(SLogicNode* pNode) {
|
|
||||||
if(LIST_LENGTH(pNode->pChildren) <= 0) return false;
|
|
||||||
|
|
||||||
SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
|
|
||||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
|
|
||||||
qDebug("vgroups:%d", ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups);
|
|
||||||
return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > tsStreamAggCnt;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
|
||||||
SLogicNode* pPartWindow = NULL;
|
|
||||||
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
|
|
||||||
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
|
|
||||||
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
|
||||||
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
|
|
||||||
}
|
|
||||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
|
||||||
++(pCxt->groupId);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t stbSplSplitIntervalForStreamMultiAgg(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
static int32_t stbSplSplitIntervalForStreamMultiAgg(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
SLogicNode* pPartWindow = NULL;
|
SLogicNode* pPartWindow = NULL;
|
||||||
SLogicNode* pMidWindow = NULL;
|
SLogicNode* pMidWindow = NULL;
|
||||||
|
@ -845,11 +800,7 @@ static int32_t stbSplSplitIntervalForStreamMultiAgg(SSplitContext* pCxt, SStable
|
||||||
|
|
||||||
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
if (pCxt->pPlanCxt->streamQuery) {
|
if (pCxt->pPlanCxt->streamQuery) {
|
||||||
// if(isStreamMultiAgg(pInfo->pSplitNode)){
|
return stbSplSplitIntervalForStreamMultiAgg(pCxt, pInfo);
|
||||||
return stbSplSplitIntervalForStreamMultiAgg(pCxt, pInfo);
|
|
||||||
// }else{
|
|
||||||
// return stbSplSplitIntervalForStream(pCxt, pInfo);
|
|
||||||
// }
|
|
||||||
} else {
|
} else {
|
||||||
return stbSplSplitIntervalForBatch(pCxt, pInfo);
|
return stbSplSplitIntervalForBatch(pCxt, pInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f empty.py
|
,,y,army,./pytest.sh python3 ./test.py -f empty.py
|
||||||
|
|
||||||
#system test
|
#system test
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_multi_agg.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.common import *
|
||||||
|
from util.sqlset import *
|
||||||
|
from util.autogen import *
|
||||||
|
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
import os
|
||||||
|
from os import path
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'streamAggCnt': 2}
|
||||||
|
# init
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), True)
|
||||||
|
|
||||||
|
def case1(self):
|
||||||
|
tdLog.debug("========case1 start========")
|
||||||
|
|
||||||
|
os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 12 > /dev/null 2>&1 &")
|
||||||
|
time.sleep(4)
|
||||||
|
tdSql.query("use test")
|
||||||
|
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
|
||||||
|
tdLog.debug("========create stream useing snode and insert data ok========")
|
||||||
|
time.sleep(15)
|
||||||
|
|
||||||
|
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
|
||||||
|
rowCnt = tdSql.getRows()
|
||||||
|
results = []
|
||||||
|
for i in range(rowCnt):
|
||||||
|
results.append(tdSql.getData(i,1))
|
||||||
|
|
||||||
|
tdSql.query("select * from st1 order by groupid,_wstart")
|
||||||
|
tdSql.checkRows(rowCnt)
|
||||||
|
for i in range(rowCnt):
|
||||||
|
data1 = tdSql.getData(i,1)
|
||||||
|
data2 = results[i]
|
||||||
|
if data1 != data2:
|
||||||
|
tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2))
|
||||||
|
tdLog.exit("check data error!")
|
||||||
|
|
||||||
|
tdLog.debug("case1 end")
|
||||||
|
|
||||||
|
def case2(self):
|
||||||
|
tdLog.debug("========case2 start========")
|
||||||
|
|
||||||
|
os.system("taosBenchmark -d db -t 20 -v 12 -n 1000 -y > /dev/null 2>&1")
|
||||||
|
# create stream
|
||||||
|
tdSql.execute("use db")
|
||||||
|
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
|
||||||
|
sql = "select count(*) from sta"
|
||||||
|
# loop wait max 60s to check count is ok
|
||||||
|
tdLog.info("loop wait result ...")
|
||||||
|
tdSql.checkDataLoop(0, 0, 99, sql, loopCount=120, waitTime=0.5)
|
||||||
|
|
||||||
|
# check all data is correct
|
||||||
|
sql = "select * from sta where cnt != 200;"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
# check ts interval is correct
|
||||||
|
sql = "select * from ( select diff(_wstart) as tsdif from sta ) where tsdif != 10;"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
tdLog.debug("case2 end")
|
||||||
|
|
||||||
|
# run
|
||||||
|
def run(self):
|
||||||
|
self.case1()
|
||||||
|
self.case2()
|
||||||
|
|
||||||
|
# stop
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue