feat:[TD-30270] checkout subscribe logic if unsubscribe twice
This commit is contained in:
parent
4554b3b33e
commit
bd83023b09
|
@ -962,6 +962,7 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013)
|
#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013)
|
||||||
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
|
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
|
||||||
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
|
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
|
||||||
|
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||||
|
|
|
@ -511,6 +511,11 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// no topics need to be rebalanced
|
||||||
|
if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
||||||
|
code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
|
||||||
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -603,7 +608,8 @@ END:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
tDeleteSMqConsumerObj(pConsumerNew);
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
||||||
return code;
|
|
||||||
|
return code == TSDB_CODE_TMQ_NO_NEED_REBALANCE ? 0 : code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||||
|
|
|
@ -806,6 +806,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||||
|
|
|
@ -229,6 +229,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/basic5.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/basic5.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb.py -N 3 -n 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb.py -N 3 -n 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ts-4674.py -N 3 -n 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ts-4674.py -N 3 -n 3
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-30270.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb1.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb1.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb2.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb2.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb3.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb3.py
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
from taos.tmq import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
clientCfgDict = {'debugFlag': 135}
|
||||||
|
updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict}
|
||||||
|
# updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict, 'tmqRowSize':1}
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
|
||||||
|
def consume_test(self):
|
||||||
|
|
||||||
|
tdSql.execute(f'create database if not exists d1')
|
||||||
|
tdSql.execute(f'use d1')
|
||||||
|
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
|
||||||
|
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
|
||||||
|
|
||||||
|
|
||||||
|
tdSql.execute(f'create topic topic_all as select * from st')
|
||||||
|
consumer_dict = {
|
||||||
|
"group.id": "g1",
|
||||||
|
"td.connect.user": "root",
|
||||||
|
"td.connect.pass": "taosdata",
|
||||||
|
"auto.offset.reset": "earliest",
|
||||||
|
}
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
try:
|
||||||
|
consumer.unsubscribe()
|
||||||
|
consumer.unsubscribe()
|
||||||
|
consumer.subscribe(["topic_all"])
|
||||||
|
consumer.subscribe(["topic_all"])
|
||||||
|
except TmqError:
|
||||||
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
cnt = 0
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
res = consumer.poll(2)
|
||||||
|
if not res:
|
||||||
|
break
|
||||||
|
val = res.value()
|
||||||
|
if val is None:
|
||||||
|
print(f"null val")
|
||||||
|
continue
|
||||||
|
for block in val:
|
||||||
|
cnt += len(block.fetchall())
|
||||||
|
|
||||||
|
print(f"block {cnt} rows")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
consumer.unsubscribe();
|
||||||
|
consumer.close()
|
||||||
|
def run(self):
|
||||||
|
self.consume_test()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue