From fb72e29fe1c5c79e84947090d831c3d1606c928b Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 16 Apr 2024 16:19:35 +0800 Subject: [PATCH] fix stream progress delay --- Jenkinsfile2 | 2 +- source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/tq/tqPush.c | 2 +- source/libs/parser/src/parTranslater.c | 11 +++++++++++ tests/system-test/2-query/tsma.py | 27 ++++++++++++++++++++------ 5 files changed, 35 insertions(+), 9 deletions(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index e9372ab686..05ba85b091 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -454,7 +454,7 @@ pipeline { cd ${WKC}/tests/parallel_test export DEFAULT_RETRY_TIME=2 date - ''' + timeout_cmd + ''' time ./run.sh -e -m /home/m.json -t cases.task -b ${BRANCH_NAME}_${BUILD_ID} -l ${WKDIR}/log -o 600 ''' + extra_param + ''' + ''' + timeout_cmd + ''' time ./run.sh -e -m /home/m.json -t cases.task -b ${BRANCH_NAME}_${BUILD_ID} -l ${WKDIR}/log -o 900 ''' + extra_param + ''' ''' } } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 05a8a54c11..1f06891d3d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1130,6 +1130,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsExperimental = cfgGetItem(pCfg, "experimental")->bval; tsMultiResultFunctionStarReturnTags = cfgGetItem(pCfg, "multiResultFunctionStarReturnTags")->bval; + tsMaxTsmaCalcDelay = cfgGetItem(pCfg, "maxTsmaCalcDelay")->i32; return 0; } @@ -1211,7 +1212,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; tmqRowSize = cfgGetItem(pCfg, "tmqRowSize")->i32; tsMaxTsmaNum = cfgGetItem(pCfg, "maxTsmaNum")->i32; - tsMaxTsmaCalcDelay = cfgGetItem(pCfg, "maxTsmaCalcDelay")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsCompactPullupInterval = cfgGetItem(pCfg, "compactPullupInterval")->i32; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 8fee1d5904..9a38776386 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -45,7 +45,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { // 1. the vnode has already been restored. // 2. the vnode should be the leader. // 3. the stream is not suspended yet. - if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) { + if ((!tsDisableStream) && (numOfTasks > 0) /* && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)*/) { tqScanWalAsync(pTq, true); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 43b67d44e1..da3159432f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12645,6 +12645,17 @@ static int32_t buildRenameColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt if (NULL != getColSchema(pTableMeta, pStmt->newColName)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN); } + if (TSDB_NORMAL_TABLE == pTableMeta->tableType) { + SArray* pTsmas = NULL; + SName tbName; + int32_t code = 0; + toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName); + if (pCxt->pMetaCache) code = getTableTsmasFromCache(pCxt->pMetaCache, &tbName, &pTsmas); + if (TSDB_CODE_SUCCESS != code) return code; + if (pTsmas && pTsmas->size > 0) { + return TSDB_CODE_TSMA_MUST_BE_DROPPED; + } + } pReq->colName = taosStrdup(pStmt->colName); pReq->colNewName = taosStrdup(pStmt->newColName); diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 02c34575cb..606faf6312 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -875,9 +875,7 @@ class TDTestCase: tdSql.execute('use test') def test_modify_col_name_value(self): - tdSql.execute('alter table test.norm_tb rename column c1 c1_new') - sql = 'select avg(c1_new) from test.norm_tb' - self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma5').ignore_query_table().get_qc()]) + tdSql.error('alter table test.norm_tb rename column c1 c1_new', -2147471088) ## tsma must be dropped ## modify tag name tdSql.error('alter stable test.meters rename tag t1 t1_new;', -2147482637) ## stream must be dropped @@ -1233,6 +1231,19 @@ class TDTestCase: # self.test_drop_ctable() self.test_drop_db() + def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float): + timeout = timeout_in_seconds + tdSql.query(sql) + while timeout > 0 and tdSql.getRows() != expected_row_num: + tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {tdSql.getRows()}, remain: {timeout_in_seconds - timeout}') + time.sleep(1) + timeout = timeout - 1 + tdSql.query(sql) + if timeout <= 0: + tdLog.exit(f'failed to wait query: {sql} to return {expected_row_num} rows timeout: {timeout_in_seconds}s') + else: + tdLog.debug(f'wait query succeed: {sql} to return {expected_row_num}, got: {tdSql.getRows()}') + def test_drop_tsma(self): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------') @@ -1244,6 +1255,7 @@ class TDTestCase: tdSql.error('drop tsma test.tsma1', -2147482491) tdSql.execute('drop tsma test.tsma2', queryTimes=1) tdSql.execute('drop tsma test.tsma1', queryTimes=1) + self.wait_query('show transactions', 0, 10) tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1284,6 +1296,7 @@ class TDTestCase: 'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096) tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1) + self.wait_query('show transactions', 0, 10) tdSql.execute('drop database nsdb') # drop norm table @@ -1297,12 +1310,12 @@ class TDTestCase: self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') tdSql.execute('alter table test.t0 ttl 2', queryTimes=1) tdSql.execute('flush database test') - tdSql.waitedQuery('show test.tables like "%t0"', 0, 10) + self.wait_query('show test.tables like "%t0"', 0, 10) # test drop multi tables tdSql.execute('drop table test.t3, test.t4') - tdSql.waitedQuery('show test.tables like "%t3"', 0, 1) - tdSql.waitedQuery('show test.tables like "%t4"', 0, 1) + self.wait_query('show test.tables like "%t3"', 0, 1) + self.wait_query('show test.tables like "%t4"', 0, 1) tdSql.query('show test.tables like "%tsma%"') tdSql.checkRows(0) @@ -1310,6 +1323,7 @@ class TDTestCase: # test drop stream tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first + self.wait_query('show transactions', 0, 10) tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1410,6 +1424,7 @@ class TDTestCase: tdSql.error( 'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) + self.wait_query('show transactions', 0, 10) tdSql.execute('drop database nsdb') def test_create_tsma_on_norm_table(self):