fix stream progress delay
This commit is contained in:
parent
7e8fdb1b4e
commit
fb72e29fe1
|
@ -454,7 +454,7 @@ pipeline {
|
||||||
cd ${WKC}/tests/parallel_test
|
cd ${WKC}/tests/parallel_test
|
||||||
export DEFAULT_RETRY_TIME=2
|
export DEFAULT_RETRY_TIME=2
|
||||||
date
|
date
|
||||||
''' + timeout_cmd + ''' time ./run.sh -e -m /home/m.json -t cases.task -b ${BRANCH_NAME}_${BUILD_ID} -l ${WKDIR}/log -o 600 ''' + extra_param + '''
|
''' + timeout_cmd + ''' time ./run.sh -e -m /home/m.json -t cases.task -b ${BRANCH_NAME}_${BUILD_ID} -l ${WKDIR}/log -o 900 ''' + extra_param + '''
|
||||||
'''
|
'''
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1130,6 +1130,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
tsExperimental = cfgGetItem(pCfg, "experimental")->bval;
|
tsExperimental = cfgGetItem(pCfg, "experimental")->bval;
|
||||||
|
|
||||||
tsMultiResultFunctionStarReturnTags = cfgGetItem(pCfg, "multiResultFunctionStarReturnTags")->bval;
|
tsMultiResultFunctionStarReturnTags = cfgGetItem(pCfg, "multiResultFunctionStarReturnTags")->bval;
|
||||||
|
tsMaxTsmaCalcDelay = cfgGetItem(pCfg, "maxTsmaCalcDelay")->i32;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1211,7 +1212,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
|
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
|
||||||
tmqRowSize = cfgGetItem(pCfg, "tmqRowSize")->i32;
|
tmqRowSize = cfgGetItem(pCfg, "tmqRowSize")->i32;
|
||||||
tsMaxTsmaNum = cfgGetItem(pCfg, "maxTsmaNum")->i32;
|
tsMaxTsmaNum = cfgGetItem(pCfg, "maxTsmaNum")->i32;
|
||||||
tsMaxTsmaCalcDelay = cfgGetItem(pCfg, "maxTsmaCalcDelay")->i32;
|
|
||||||
|
|
||||||
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
|
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
|
||||||
tsCompactPullupInterval = cfgGetItem(pCfg, "compactPullupInterval")->i32;
|
tsCompactPullupInterval = cfgGetItem(pCfg, "compactPullupInterval")->i32;
|
||||||
|
|
|
@ -45,7 +45,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
|
||||||
// 1. the vnode has already been restored.
|
// 1. the vnode has already been restored.
|
||||||
// 2. the vnode should be the leader.
|
// 2. the vnode should be the leader.
|
||||||
// 3. the stream is not suspended yet.
|
// 3. the stream is not suspended yet.
|
||||||
if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) {
|
if ((!tsDisableStream) && (numOfTasks > 0) /* && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)*/) {
|
||||||
tqScanWalAsync(pTq, true);
|
tqScanWalAsync(pTq, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12645,6 +12645,17 @@ static int32_t buildRenameColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
|
||||||
if (NULL != getColSchema(pTableMeta, pStmt->newColName)) {
|
if (NULL != getColSchema(pTableMeta, pStmt->newColName)) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN);
|
||||||
}
|
}
|
||||||
|
if (TSDB_NORMAL_TABLE == pTableMeta->tableType) {
|
||||||
|
SArray* pTsmas = NULL;
|
||||||
|
SName tbName;
|
||||||
|
int32_t code = 0;
|
||||||
|
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName);
|
||||||
|
if (pCxt->pMetaCache) code = getTableTsmasFromCache(pCxt->pMetaCache, &tbName, &pTsmas);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) return code;
|
||||||
|
if (pTsmas && pTsmas->size > 0) {
|
||||||
|
return TSDB_CODE_TSMA_MUST_BE_DROPPED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pReq->colName = taosStrdup(pStmt->colName);
|
pReq->colName = taosStrdup(pStmt->colName);
|
||||||
pReq->colNewName = taosStrdup(pStmt->newColName);
|
pReq->colNewName = taosStrdup(pStmt->newColName);
|
||||||
|
|
|
@ -875,9 +875,7 @@ class TDTestCase:
|
||||||
tdSql.execute('use test')
|
tdSql.execute('use test')
|
||||||
|
|
||||||
def test_modify_col_name_value(self):
|
def test_modify_col_name_value(self):
|
||||||
tdSql.execute('alter table test.norm_tb rename column c1 c1_new')
|
tdSql.error('alter table test.norm_tb rename column c1 c1_new', -2147471088) ## tsma must be dropped
|
||||||
sql = 'select avg(c1_new) from test.norm_tb'
|
|
||||||
self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma5').ignore_query_table().get_qc()])
|
|
||||||
|
|
||||||
## modify tag name
|
## modify tag name
|
||||||
tdSql.error('alter stable test.meters rename tag t1 t1_new;', -2147482637) ## stream must be dropped
|
tdSql.error('alter stable test.meters rename tag t1 t1_new;', -2147482637) ## stream must be dropped
|
||||||
|
@ -1233,6 +1231,19 @@ class TDTestCase:
|
||||||
# self.test_drop_ctable()
|
# self.test_drop_ctable()
|
||||||
self.test_drop_db()
|
self.test_drop_db()
|
||||||
|
|
||||||
|
def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float):
|
||||||
|
timeout = timeout_in_seconds
|
||||||
|
tdSql.query(sql)
|
||||||
|
while timeout > 0 and tdSql.getRows() != expected_row_num:
|
||||||
|
tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {tdSql.getRows()}, remain: {timeout_in_seconds - timeout}')
|
||||||
|
time.sleep(1)
|
||||||
|
timeout = timeout - 1
|
||||||
|
tdSql.query(sql)
|
||||||
|
if timeout <= 0:
|
||||||
|
tdLog.exit(f'failed to wait query: {sql} to return {expected_row_num} rows timeout: {timeout_in_seconds}s')
|
||||||
|
else:
|
||||||
|
tdLog.debug(f'wait query succeed: {sql} to return {expected_row_num}, got: {tdSql.getRows()}')
|
||||||
|
|
||||||
def test_drop_tsma(self):
|
def test_drop_tsma(self):
|
||||||
function_name = sys._getframe().f_code.co_name
|
function_name = sys._getframe().f_code.co_name
|
||||||
tdLog.debug(f'-----{function_name}------')
|
tdLog.debug(f'-----{function_name}------')
|
||||||
|
@ -1244,6 +1255,7 @@ class TDTestCase:
|
||||||
tdSql.error('drop tsma test.tsma1', -2147482491)
|
tdSql.error('drop tsma test.tsma1', -2147482491)
|
||||||
tdSql.execute('drop tsma test.tsma2', queryTimes=1)
|
tdSql.execute('drop tsma test.tsma2', queryTimes=1)
|
||||||
tdSql.execute('drop tsma test.tsma1', queryTimes=1)
|
tdSql.execute('drop tsma test.tsma1', queryTimes=1)
|
||||||
|
self.wait_query('show transactions', 0, 10)
|
||||||
tdSql.execute('drop database test', queryTimes=1)
|
tdSql.execute('drop database test', queryTimes=1)
|
||||||
|
|
||||||
self.init_data()
|
self.init_data()
|
||||||
|
@ -1284,6 +1296,7 @@ class TDTestCase:
|
||||||
'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096)
|
'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096)
|
||||||
|
|
||||||
tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1)
|
tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1)
|
||||||
|
self.wait_query('show transactions', 0, 10)
|
||||||
tdSql.execute('drop database nsdb')
|
tdSql.execute('drop database nsdb')
|
||||||
|
|
||||||
# drop norm table
|
# drop norm table
|
||||||
|
@ -1297,12 +1310,12 @@ class TDTestCase:
|
||||||
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
|
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
|
||||||
tdSql.execute('alter table test.t0 ttl 2', queryTimes=1)
|
tdSql.execute('alter table test.t0 ttl 2', queryTimes=1)
|
||||||
tdSql.execute('flush database test')
|
tdSql.execute('flush database test')
|
||||||
tdSql.waitedQuery('show test.tables like "%t0"', 0, 10)
|
self.wait_query('show test.tables like "%t0"', 0, 10)
|
||||||
|
|
||||||
# test drop multi tables
|
# test drop multi tables
|
||||||
tdSql.execute('drop table test.t3, test.t4')
|
tdSql.execute('drop table test.t3, test.t4')
|
||||||
tdSql.waitedQuery('show test.tables like "%t3"', 0, 1)
|
self.wait_query('show test.tables like "%t3"', 0, 1)
|
||||||
tdSql.waitedQuery('show test.tables like "%t4"', 0, 1)
|
self.wait_query('show test.tables like "%t4"', 0, 1)
|
||||||
|
|
||||||
tdSql.query('show test.tables like "%tsma%"')
|
tdSql.query('show test.tables like "%tsma%"')
|
||||||
tdSql.checkRows(0)
|
tdSql.checkRows(0)
|
||||||
|
@ -1310,6 +1323,7 @@ class TDTestCase:
|
||||||
# test drop stream
|
# test drop stream
|
||||||
tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first
|
tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first
|
||||||
|
|
||||||
|
self.wait_query('show transactions', 0, 10)
|
||||||
tdSql.execute('drop database test', queryTimes=1)
|
tdSql.execute('drop database test', queryTimes=1)
|
||||||
self.init_data()
|
self.init_data()
|
||||||
|
|
||||||
|
@ -1410,6 +1424,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.error(
|
tdSql.error(
|
||||||
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
|
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
|
||||||
|
self.wait_query('show transactions', 0, 10)
|
||||||
tdSql.execute('drop database nsdb')
|
tdSql.execute('drop database nsdb')
|
||||||
|
|
||||||
def test_create_tsma_on_norm_table(self):
|
def test_create_tsma_on_norm_table(self):
|
||||||
|
|
Loading…
Reference in New Issue