diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index abbcb52db4..308089a9c1 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -542,6 +542,8 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) { int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req); if (contLen < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + sdbCancelFetch(pMnode->pSdb, pDetail); + sdbRelease(pMnode->pSdb, pDetail); continue; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 78e3ceabce..88b38d5e38 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1125,11 +1125,22 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * if (pNewDb->cfg.withArbitrator) { SArbGroup arbGroup = {0}; mndArbGroupInitFromVgObj(&newVgroup, &arbGroup); - if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1; + if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + taosArrayDestroy(pArray); + return -1; + } + } else { SArbGroup arbGroup = {0}; mndArbGroupInitFromVgObj(pVgroup, &arbGroup); - if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1; + if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + taosArrayDestroy(pArray); + return -1; + } } } } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 091edc6ab0..5164557184 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -537,7 +537,7 @@ SMqSubscribeObj *tNewSubscribeObj(const char *key) { memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN); taosInitRWLatch(&pSubObj->lock); pSubObj->vgNum = 0; - pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); // TODO set hash free fp /*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/ @@ -557,7 +557,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pSubNew->withMeta = pSub->withMeta; pSubNew->vgNum = pSub->vgNum; - pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); // TODO set hash free fp /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/ void *pIter = NULL; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ffb723756c..5f019d07f5 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -799,6 +799,29 @@ static int32_t initRebOutput(SMqRebOutputObj *rebOutput) { return 0; } +// This function only works when there are dirty consumers +static void checkConsumer(SMnode *pMnode, SMqSubscribeObj* pSub){ + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) { + break; + } + + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pConsumerEp->consumerId); + if (pConsumer != NULL) { + mndReleaseConsumer(pMnode, pConsumer); + continue; + } + mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId); + taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs); + + taosArrayDestroy(pConsumerEp->vgs); + taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); + } +} + static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput){ const char *key = rebInput->pRebInfo->key; SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key); @@ -834,8 +857,9 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu mInfo("[rebalance] sub topic:%s has no consumers sub yet", key); } else { taosRLockLatch(&pSub->lock); - rebInput->oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebOutput->pSub = tCloneSubscribeObj(pSub); + checkConsumer(pMnode, rebOutput->pSub); + rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash); taosRUnLockLatch(&pSub->lock); mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum); @@ -910,6 +934,7 @@ END: static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){ void* pIter = NULL; SVgObj* pVgObj = NULL; + int32_t ret = 0; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj); if (pIter == NULL) { @@ -923,8 +948,8 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); if(pReq == NULL){ terrno = TSDB_CODE_OUT_OF_MEMORY; - sdbRelease(pMnode->pSdb, pVgObj); - return -1; + ret = -1; + goto END; } pReq->head.vgId = htonl(pVgObj->vgId); pReq->vgId = pVgObj->vgId; @@ -940,33 +965,50 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran sdbRelease(pMnode->pSdb, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; + ret = -1; + goto END; } } - return 0; + END: + sdbRelease(pMnode->pSdb, pVgObj); + sdbCancelFetch(pMnode->pSdb, pIter); + return ret; } static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic){ void *pIter = NULL; SMqConsumerObj *pConsumer = NULL; + int ret = 0; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); if (pIter == NULL) { break; } - if (strcmp(cgroup, pConsumer->cgroup) == 0 && taosArrayGetSize(pConsumer->currentTopics) == 0) { - int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); - if (code != 0) { - sdbRelease(pMnode->pSdb, pConsumer); - sdbCancelFetch(pMnode->pSdb, pIter); - return code; + // drop consumer in lost status, other consumers not in lost status already deleted by rebalance + if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { + sdbRelease(pMnode->pSdb, pConsumer); + continue; + } + int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(pConsumer->assignedTopics, i); + if (strcmp(topic, name) == 0) { + int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); + if (code != 0) { + ret = code; + goto END; + } } } + sdbRelease(pMnode->pSdb, pConsumer); } - return 0; + +END: + sdbRelease(pMnode->pSdb, pConsumer); + sdbCancelFetch(pMnode->pSdb, pIter); + return ret; } static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 39b9c6d34a..49b62d8abb 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -496,7 +496,71 @@ class TDTestCase: consumer.close() print("consume_ts_4551 ok") + def consume_TS_5067_Test(self): + tdSql.execute(f'create database if not exists d1 vgroups 1') + tdSql.execute(f'use d1') + tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)') + tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t1 using st tags(1) values(now+5s, 11) (now+10s, 12)') + + tdSql.query("select * from st") + tdSql.checkRows(8) + + tdSql.execute(f'create topic t1 as select * from st') + tdSql.execute(f'create topic t2 as select * from st') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["t1"]) + except TmqError: + tdLog.exit(f"subscribe error") + + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + if index != 1: + tdLog.exit("consume error") + break + val = res.value() + if val is None: + continue + cnt = 0; + for block in val: + cnt += len(block.fetchall()) + + if cnt != 8: + tdLog.exit("consume error") + + index += 1 + finally: + consumer.close() + + consumer1 = Consumer(consumer_dict) + try: + consumer1.subscribe(["t2"]) + except TmqError: + tdLog.exit(f"subscribe error") + + tdSql.execute(f'drop consumer group g1 on t1') + tdSql.query(f'show consumers') + tdSql.checkRows(1) + consumer1.close() + tdSql.execute(f'drop topic t1') + tdSql.execute(f'drop topic t2') + tdSql.execute(f'drop database d1') + def run(self): + self.consume_TS_5067_Test() self.consumeTest() self.consume_ts_4544() self.consume_ts_4551()