diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 0068b582cf..9f84e25c9f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -912,6 +912,11 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId); + if (pVgObj == NULL) { + mError("sendDeleteSubToVnode %s failed since vg %d doesn't exist", pSub->key, pVgEp->vgId); + continue; + } SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); if(pReq == NULL){ terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -922,17 +927,12 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran pReq->consumerId = -1; memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId); - if (pVgObj == NULL) { - taosMemoryFree(pReq); - terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; - return -1; - } STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj);; action.pCont = pReq; action.contLen = sizeof(SMqVDeleteReq); action.msgType = TDMT_VND_TMQ_DELETE_SUB; + action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST; mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 79f53e6dec..18442f182b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -649,21 +649,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); - STqHandle* pHandle = NULL; - while (1) { - pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle) { - break; - } - taosRLockLatch(&pTq->lock); - ret = tqMetaGetHandle(pTq, req.subKey); - taosRUnLockLatch(&pTq->lock); - - if (ret < 0) { - break; - } - } - + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, @@ -698,10 +684,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, 0); tqUnregisterPushHandle(pTq, pHandle); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + taosHashRemove(pTq->pHandle, pHandle->subKey, strlen(pHandle->subKey)); + + // update handle to avoid req->qmsg changed if spilt vnode is failed + STqHandle handle = {0}; + ret = tqCreateHandle(pTq, &req, &handle); + if (ret < 0) { + tqDestroyTqHandle(&handle); + goto end; + } + ret = tqMetaSaveHandle(pTq, req.subKey, &handle); } taosWUnLockLatch(&pTq->lock); break; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 76322c527f..cb64c9033a 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -351,7 +351,6 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); handle->consumerId = req->newConsumerId; - handle->epoch = -1; handle->execHandle.subType = req->subType; handle->fetchMeta = req->withMeta; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 71e6771370..7375478e61 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -87,7 +87,7 @@ int tqUnregisterPushHandle(STQ* pTq, void *handle) { int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); - if(pHandle->msg != NULL) { + if(ret == 0 && pHandle->msg != NULL) { // tqPushDataRsp(pHandle, vgId); tqPushEmptyDataRsp(pHandle, vgId); diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 694227cea7..be99d01a5c 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -237,21 +237,19 @@ class ClusterComCheck: last_number=vgroup_numbers-1 while count < count_number: time.sleep(1) + count+=1 + print("check vgroup count :", count) tdSql.query(f"show {db_name}.vgroups;") - if count == 0 : - if tdSql.checkRows(vgroup_numbers) : - tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" ) - else: - tdLog.exit(f"vgroup number of {db_name} is not correct") + if tdSql.getRows() != vgroup_numbers : + continue if self.db_replica == 1 : if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader': tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';") print("db replica :",tdSql.queryResult[0][0]) if tdSql.queryResult[0][0] == db_replica: - ready_time= (count + 1) - tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count + 1} s") + tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count} s") return True - count+=1 + elif self.db_replica == 3 : vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]] @@ -261,10 +259,8 @@ class ClusterComCheck: tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';") print("db replica :",tdSql.queryResult[0][0]) if tdSql.queryResult[0][0] == db_replica: - ready_time= (count + 1) - tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {ready_time} s") + tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {count} s") return True - count+=1 else: tdLog.debug(tdSql.queryResult) tdLog.notice(f"elections of {db_name} all vgroups with replica {self.db_replica} are failed in {count} s ") diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py index f01bf2558c..a5e61adc8d 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py @@ -200,12 +200,11 @@ class TDTestCase: # tmqCom.checkFileContent(consumerId, queryString) - time.sleep(2) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) - if deleteWal == True: - clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) + tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self): diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py index 5e11de04cb..eb35ebc718 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py @@ -199,13 +199,11 @@ class TDTestCase: tdLog.exit("%d tmq consume rows error!"%consumerId) # tmqCom.checkFileContent(consumerId, queryString) + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) - time.sleep(2) + time.sleep(3) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) - - if deleteWal == True: - clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self):