Merge pull request #29567 from taosdata/fix/TS-5906

fix:[TS-5906]clear meta cache for subscription if tag is changed
This commit is contained in:
Shengliang Guan 2025-01-15 10:39:19 +08:00 committed by GitHub
commit 97a091a8bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 98 additions and 0 deletions

View File

@ -469,6 +469,13 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
}
SStreamScanInfo* pScanInfo = pInfo->info;
if (pInfo->pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { // clear meta cache for subscription if tag is changed
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
int64_t* uid = (int64_t*)taosArrayGet(tableIdList, i);
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
taosLRUCacheErase(pTableScanInfo->base.metaCache.pTableMetaEntryCache, uid, LONG_BYTES);
}
}
if (isAdd) { // add new table id
SArray* qa = NULL;

View File

@ -330,6 +330,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td33504.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5906.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py

View File

@ -0,0 +1,90 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from taos.tmq import *
from taos import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 143, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def test(self):
tdSql.execute(f'create database if not exists db vgroups 1')
tdSql.execute(f'use db')
tdSql.execute(f'CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)')
tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco1', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)")
tdSql.execute(f'create topic t0 as select * from meters')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_dict)
try:
consumer.subscribe(["t0"])
except TmqError:
tdLog.exit(f"subscribe error")
index = 0;
try:
while True:
if index == 2:
break
res = consumer.poll(5)
print(res)
if not res:
print("res null")
break
val = res.value()
if val is None:
continue
for block in val:
data = block.fetchall()
for element in data:
print(f"data len: {len(data)}")
print(element)
if index == 0 and data[0][-1] != 2:
tdLog.exit(f"error: {data[0][-1]}")
if index == 1 and data[0][-1] != 100:
tdLog.exit(f"error: {data[0][-1]}")
tdSql.execute("alter table d1001 set tag groupId = 100")
tdSql.execute("INSERT INTO d1001 VALUES('2018-10-05 14:38:06.000',10.30000,219,0.31000)")
index += 1
finally:
consumer.close()
def run(self):
self.test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())