fix(stream):fix memory leak and update test cases.

This commit is contained in:
Haojun Liao 2024-06-11 16:34:47 +08:00
parent 3b537367c9
commit 781b644a8c
4 changed files with 84 additions and 70 deletions

View File

@ -1154,6 +1154,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
} }
removeTasksInBuf(pInvalidList, &execInfo); removeTasksInBuf(pInvalidList, &execInfo);
taosArrayDestroy(pInvalidList);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
return ready ? 0 : -1; return ready ? 0 : -1;

View File

@ -53,8 +53,22 @@
"sample_format": "csv", "sample_format": "csv",
"sample_file": "./sample.csv", "sample_file": "./sample.csv",
"tags_file": "", "tags_file": "",
"columns": [{ "type": "INT","count": 4093}], "columns": [
"tags": [{"type": "TINYINT", "count": 1},{"type": "NCHAR","count": 1}] {
"type": "INT",
"count": 4093
}
],
"tags": [
{
"type": "TINYINT",
"count": 1
},
{
"type": "NCHAR",
"count": 1
}
]
} }
] ]
} }

View File

@ -1270,15 +1270,14 @@ class TDTestCase:
def test_drop_tsma(self): def test_drop_tsma(self):
function_name = sys._getframe().f_code.co_name function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------') tdLog.debug(f'-----{function_name}------')
self.create_tsma('tsma1', 'test', 'meters', [ self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
'avg(c1)', 'avg(c2)'], '5m')
self.create_recursive_tsma('tsma1', 'tsma2', 'test', '15m', 'meters') self.create_recursive_tsma('tsma1', 'tsma2', 'test', '15m', 'meters')
# drop recursive tsma first # drop recursive tsma first
tdSql.error('drop tsma test.tsma1', -2147482491) tdSql.error('drop tsma test.tsma1', -2147482491)
tdSql.execute('drop tsma test.tsma2', queryTimes=1) tdSql.execute('drop tsma test.tsma2', queryTimes=1)
tdSql.execute('drop tsma test.tsma1', queryTimes=1) tdSql.execute('drop tsma test.tsma1', queryTimes=1)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
tdSql.execute('drop database test', queryTimes=1) tdSql.execute('drop database test', queryTimes=1)
self.init_data() self.init_data()
@ -1319,7 +1318,7 @@ class TDTestCase:
'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096) 'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096)
tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1) tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
tdSql.execute('drop database nsdb') tdSql.execute('drop database nsdb')
# drop norm table # drop norm table
@ -1346,7 +1345,7 @@ class TDTestCase:
# test drop stream # test drop stream
tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
tdSql.execute('drop database test', queryTimes=1) tdSql.execute('drop database test', queryTimes=1)
self.init_data() self.init_data()
@ -1449,7 +1448,7 @@ class TDTestCase:
tdSql.error( tdSql.error(
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) 'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
tdSql.execute('drop database nsdb') tdSql.execute('drop database nsdb')
def test_create_tsma_on_norm_table(self): def test_create_tsma_on_norm_table(self):

View File

@ -68,7 +68,7 @@ class TDTestCase:
# create stream # create stream
tdSql.execute("use db", queryTimes=100) tdSql.execute("use db", queryTimes=100)
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
time.sleep(5) time.sleep(10)
sql = "select count(*) from sta" sql = "select count(*) from sta"
# loop wait max 60s to check count is ok # loop wait max 60s to check count is ok