diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index dffab1b163..1386b0b82f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -469,6 +469,13 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI } SStreamScanInfo* pScanInfo = pInfo->info; + if (pInfo->pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { // clear meta cache for subscription if tag is changed + for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { + int64_t* uid = (int64_t*)taosArrayGet(tableIdList, i); + STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info; + taosLRUCacheErase(pTableScanInfo->base.metaCache.pTableMetaEntryCache, uid, LONG_BYTES); + } + } if (isAdd) { // add new table id SArray* qa = NULL; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index ea92f0bef7..97509e453a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -329,6 +329,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5906.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py diff --git a/tests/system-test/7-tmq/tmq_ts5906.py b/tests/system-test/7-tmq/tmq_ts5906.py new file mode 100644 index 0000000000..13e756baa3 --- /dev/null +++ b/tests/system-test/7-tmq/tmq_ts5906.py @@ -0,0 +1,90 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +from taos import * + +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 143, 'asynclog': 0} + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def test(self): + tdSql.execute(f'create database if not exists db vgroups 1') + tdSql.execute(f'use db') + tdSql.execute(f'CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)') + tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco1', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)") + + + tdSql.execute(f'create topic t0 as select * from meters') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["t0"]) + except TmqError: + tdLog.exit(f"subscribe error") + + index = 0; + try: + while True: + if index == 2: + break + res = consumer.poll(5) + print(res) + if not res: + print("res null") + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(f"data len: {len(data)}") + print(element) + if index == 0 and data[0][-1] != 2: + tdLog.exit(f"error: {data[0][-1]}") + if index == 1 and data[0][-1] != 100: + tdLog.exit(f"error: {data[0][-1]}") + + tdSql.execute("alter table d1001 set tag groupId = 100") + tdSql.execute("INSERT INTO d1001 VALUES('2018-10-05 14:38:06.000',10.30000,219,0.31000)") + index += 1 + finally: + consumer.close() + + + def run(self): + self.test() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())