fix:[TS-5067] check if consumer belong to this cgroup:topic where drop cgroup

This commit is contained in:
wangmm0220 2024-06-21 14:11:33 +08:00
parent 52111f9cab
commit ddcf7c74f7
3 changed files with 88 additions and 4 deletions

View File

@ -537,7 +537,7 @@ SMqSubscribeObj *tNewSubscribeObj(const char *key) {
memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubObj->lock);
pSubObj->vgNum = 0;
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
// TODO set hash free fp
/*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/
@ -557,7 +557,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
pSubNew->withMeta = pSub->withMeta;
pSubNew->vgNum = pSub->vgNum;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
// TODO set hash free fp
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
void *pIter = NULL;

View File

@ -799,6 +799,28 @@ static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
return 0;
}
// This function only works when there are dirty consumers
static void checkConsumer(SMnode *pMnode, SMqSubscribeObj* pSub){
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) {
break;
}
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pConsumerEp->consumerId);
if (pConsumer != NULL) {
continue;
}
mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs);
taosArrayDestroy(pConsumerEp->vgs);
taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
}
}
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput){
const char *key = rebInput->pRebInfo->key;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key);
@ -834,8 +856,9 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
} else {
taosRLockLatch(&pSub->lock);
rebInput->oldConsumerNum = taosHashGetSize(pSub->consumerHash);
rebOutput->pSub = tCloneSubscribeObj(pSub);
rebOutput.pSub = tCloneSubscribeObj(pSub);
checkConsumer(pMnode, rebOutput.pSub);
rebInput.oldConsumerNum = taosHashGetSize(rebOutput.pSub->consumerHash);
taosRUnLockLatch(&pSub->lock);
mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);

View File

@ -496,7 +496,68 @@ class TDTestCase:
consumer.close()
print("consume_ts_4551 ok")
def consume_TS_5067_Test(self):
tdSql.execute(f'create database if not exists d1 vgroups 1')
tdSql.execute(f'use d1')
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t1 using st tags(1) values(now+5s, 11) (now+10s, 12)')
tdSql.query("select * from st")
tdSql.checkRows(8)
tdSql.execute(f'create topic t1 as select * from st')
tdSql.execute(f'create topic t2 as select * from st')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_dict)
try:
consumer.subscribe(["t1"])
except TmqError:
tdLog.exit(f"subscribe error")
index = 0
try:
while True:
res = consumer.poll(1)
if not res:
if index != 1:
tdLog.exit("consume error")
break
val = res.value()
if val is None:
continue
cnt = 0;
for block in val:
cnt += len(block.fetchall())
if cnt != 8:
tdLog.exit("consume error")
index += 1
finally:
consumer.close()
consumer1 = Consumer(consumer_dict)
try:
consumer1.subscribe(["t2"])
except TmqError:
tdLog.exit(f"subscribe error")
tdSql.execute(f'drop consumer group g1 on t1')
tdSql.query(f'show consumers')
tdSql.checkRows(1)
consumer1.close()
def run(self):
self.consume_TS_5067_Test()
self.consumeTest()
self.consume_ts_4544()
self.consume_ts_4551()