Merge pull request #26233 from taosdata/fix/TS-5067-3.0
fix:[TS-5067] check if consumer belong to this cgroup:topic where drop cgroup
This commit is contained in:
commit
f9be1e9c00
|
@ -542,6 +542,8 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
|
|||
int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
|
||||
if (contLen < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
sdbCancelFetch(pMnode->pSdb, pDetail);
|
||||
sdbRelease(pMnode->pSdb, pDetail);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -1125,11 +1125,22 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
if (pNewDb->cfg.withArbitrator) {
|
||||
SArbGroup arbGroup = {0};
|
||||
mndArbGroupInitFromVgObj(&newVgroup, &arbGroup);
|
||||
if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1;
|
||||
if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
taosArrayDestroy(pArray);
|
||||
return -1;
|
||||
}
|
||||
|
||||
} else {
|
||||
SArbGroup arbGroup = {0};
|
||||
mndArbGroupInitFromVgObj(pVgroup, &arbGroup);
|
||||
if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1;
|
||||
if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
taosArrayDestroy(pArray);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -537,7 +537,7 @@ SMqSubscribeObj *tNewSubscribeObj(const char *key) {
|
|||
memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
taosInitRWLatch(&pSubObj->lock);
|
||||
pSubObj->vgNum = 0;
|
||||
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
|
||||
// TODO set hash free fp
|
||||
/*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/
|
||||
|
@ -557,7 +557,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
|||
pSubNew->withMeta = pSub->withMeta;
|
||||
|
||||
pSubNew->vgNum = pSub->vgNum;
|
||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
// TODO set hash free fp
|
||||
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
|
||||
void *pIter = NULL;
|
||||
|
|
|
@ -799,6 +799,29 @@ static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// This function only works when there are dirty consumers
|
||||
static void checkConsumer(SMnode *pMnode, SMqSubscribeObj* pSub){
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pConsumerEp->consumerId);
|
||||
if (pConsumer != NULL) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
continue;
|
||||
}
|
||||
mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
|
||||
taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs);
|
||||
|
||||
taosArrayDestroy(pConsumerEp->vgs);
|
||||
taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput){
|
||||
const char *key = rebInput->pRebInfo->key;
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key);
|
||||
|
@ -834,8 +857,9 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
|
|||
mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
|
||||
} else {
|
||||
taosRLockLatch(&pSub->lock);
|
||||
rebInput->oldConsumerNum = taosHashGetSize(pSub->consumerHash);
|
||||
rebOutput->pSub = tCloneSubscribeObj(pSub);
|
||||
checkConsumer(pMnode, rebOutput->pSub);
|
||||
rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
|
||||
mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
|
||||
|
@ -910,6 +934,7 @@ END:
|
|||
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){
|
||||
void* pIter = NULL;
|
||||
SVgObj* pVgObj = NULL;
|
||||
int32_t ret = 0;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj);
|
||||
if (pIter == NULL) {
|
||||
|
@ -923,8 +948,8 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
|
|||
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
||||
if(pReq == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
sdbRelease(pMnode->pSdb, pVgObj);
|
||||
return -1;
|
||||
ret = -1;
|
||||
goto END;
|
||||
}
|
||||
pReq->head.vgId = htonl(pVgObj->vgId);
|
||||
pReq->vgId = pVgObj->vgId;
|
||||
|
@ -940,33 +965,50 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
|
|||
|
||||
sdbRelease(pMnode->pSdb, pVgObj);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
ret = -1;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
END:
|
||||
sdbRelease(pMnode->pSdb, pVgObj);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic){
|
||||
void *pIter = NULL;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
int ret = 0;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (strcmp(cgroup, pConsumer->cgroup) == 0 && taosArrayGetSize(pConsumer->currentTopics) == 0) {
|
||||
// drop consumer in lost status, other consumers not in lost status already deleted by rebalance
|
||||
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) {
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
continue;
|
||||
}
|
||||
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
||||
if (strcmp(topic, name) == 0) {
|
||||
int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
|
||||
if (code != 0) {
|
||||
ret = code;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
}
|
||||
|
||||
END:
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
}
|
||||
return 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||
|
|
|
@ -496,7 +496,71 @@ class TDTestCase:
|
|||
consumer.close()
|
||||
print("consume_ts_4551 ok")
|
||||
|
||||
def consume_TS_5067_Test(self):
|
||||
tdSql.execute(f'create database if not exists d1 vgroups 1')
|
||||
tdSql.execute(f'use d1')
|
||||
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
|
||||
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into t1 using st tags(1) values(now+5s, 11) (now+10s, 12)')
|
||||
|
||||
tdSql.query("select * from st")
|
||||
tdSql.checkRows(8)
|
||||
|
||||
tdSql.execute(f'create topic t1 as select * from st')
|
||||
tdSql.execute(f'create topic t2 as select * from st')
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["t1"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
index = 0
|
||||
try:
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
if index != 1:
|
||||
tdLog.exit("consume error")
|
||||
break
|
||||
val = res.value()
|
||||
if val is None:
|
||||
continue
|
||||
cnt = 0;
|
||||
for block in val:
|
||||
cnt += len(block.fetchall())
|
||||
|
||||
if cnt != 8:
|
||||
tdLog.exit("consume error")
|
||||
|
||||
index += 1
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
consumer1 = Consumer(consumer_dict)
|
||||
try:
|
||||
consumer1.subscribe(["t2"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
tdSql.execute(f'drop consumer group g1 on t1')
|
||||
tdSql.query(f'show consumers')
|
||||
tdSql.checkRows(1)
|
||||
consumer1.close()
|
||||
tdSql.execute(f'drop topic t1')
|
||||
tdSql.execute(f'drop topic t2')
|
||||
tdSql.execute(f'drop database d1')
|
||||
|
||||
def run(self):
|
||||
self.consume_TS_5067_Test()
|
||||
self.consumeTest()
|
||||
self.consume_ts_4544()
|
||||
self.consume_ts_4551()
|
||||
|
|
Loading…
Reference in New Issue