From 70db803aecef56c68094f185f48af4993ee65eab Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 11 Jun 2024 17:24:29 +0800 Subject: [PATCH] fix:[TD-30365] ci case error & drop topic error if vnode is splitted --- source/dnode/mnode/impl/inc/mndDef.h | 4 +-- source/dnode/mnode/impl/src/mndConsumer.c | 15 ++++------ source/dnode/mnode/impl/src/mndSubscribe.c | 32 +++++++++++--------- source/dnode/mnode/impl/src/mndTopic.c | 8 ++--- source/dnode/mnode/impl/src/mndTrans.c | 34 +++++++++++----------- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 29 ++++++++++-------- source/dnode/vnode/src/tq/tqMeta.c | 6 ++-- 8 files changed, 67 insertions(+), 63 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5c21e9b22b..81772635fc 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -102,8 +102,8 @@ typedef enum { TRN_CONFLICT_GLOBAL = 1, TRN_CONFLICT_DB = 2, TRN_CONFLICT_DB_INSIDE = 3, - TRN_CONFLICT_TOPIC = 4, - TRN_CONFLICT_TOPIC_INSIDE = 5, +// TRN_CONFLICT_TOPIC = 4, +// TRN_CONFLICT_TOPIC_INSIDE = 5, TRN_CONFLICT_ARBGROUP = 6, } ETrnConflct; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 3eef2afcc1..9a7a8155ec 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -91,7 +91,7 @@ void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SR } } -static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, +static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) { SMqTopicObj *pTopic = NULL; int32_t code = 0; @@ -135,11 +135,6 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * } } - mndTransSetDbName(pTrans, pOneTopic, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - code = -1; - goto FAILED; - } mndReleaseTopic(pMnode, pTopic); } @@ -177,12 +172,12 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { goto END; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); if (pTrans == NULL) { code = -1; goto END; } - code = validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); + code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); if (code != 0) { goto END; } @@ -675,13 +670,13 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _over; } - code = validateTopics(pTrans, subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay); + code = validateTopics(subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay); if (code != TSDB_CODE_SUCCESS) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 9f84e25c9f..ffb723756c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -618,13 +618,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto END; } - mndTransSetDbName(pTrans, topic, cgroup); + mndTransSetDbName(pTrans, pOutput->pSub->dbName, cgroup); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto END; @@ -908,22 +908,26 @@ END: } static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){ - // iter all vnode to delete handle - int32_t sz = taosArrayGetSize(pSub->unassignedVgs); - for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId); - if (pVgObj == NULL) { - mError("sendDeleteSubToVnode %s failed since vg %d doesn't exist", pSub->key, pVgEp->vgId); + void* pIter = NULL; + SVgObj* pVgObj = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj); + if (pIter == NULL) { + break; + } + + if (!mndVgroupInDb(pVgObj, pSub->dbUid)) { + sdbRelease(pMnode->pSdb, pVgObj); continue; } SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); if(pReq == NULL){ terrno = TSDB_CODE_OUT_OF_MEMORY; + sdbRelease(pMnode->pSdb, pVgObj); return -1; } - pReq->head.vgId = htonl(pVgEp->vgId); - pReq->vgId = pVgEp->vgId; + pReq->head.vgId = htonl(pVgObj->vgId); + pReq->vgId = pVgObj->vgId; pReq->consumerId = -1; memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); @@ -934,7 +938,7 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran action.msgType = TDMT_VND_TMQ_DELETE_SUB; action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST; - mndReleaseVgroup(pMnode, pVgObj); + sdbRelease(pMnode->pSdb, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -996,7 +1000,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup"); if (pTrans == NULL) { mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); code = -1; @@ -1004,7 +1008,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { } mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); - mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup); + mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto end; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 8a06b4a613..bcb38a3902 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -422,14 +422,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * SQueryPlan *pPlan = NULL; SMqTopicObj topicObj = {0}; - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic"); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); code = -1; goto _OUT; } - mndTransSetDbName(pTrans, pCreate->name, NULL); + mndTransSetDbName(pTrans, pDb->name, NULL); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto _OUT; @@ -779,14 +779,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-topic"); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); code = -1; goto end; } - mndTransSetDbName(pTrans, pTopic->name, NULL); + mndTransSetDbName(pTrans, pTopic->db, NULL); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto end; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 8b01d296a3..d80164bcad 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -830,26 +830,26 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { } } - if (pNew->conflict == TRN_CONFLICT_TOPIC) { - if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { - if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; - } - } - if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) { - if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_TOPIC) { - if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; - } - if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { - if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) - conflict = true; - } - } +// if (pNew->conflict == TRN_CONFLICT_TOPIC) { +// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; +// if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { +// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; +// } +// } +// if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) { +// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; +// if (pTrans->conflict == TRN_CONFLICT_TOPIC) { +// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; +// } +// if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { +// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) +// conflict = true; +// } +// } if (pNew->conflict == TRN_CONFLICT_ARBGROUP) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) { - void *pIter = taosHashIterate(pNew->arbGroupIds, NULL); + pIter = taosHashIterate(pNew->arbGroupIds, NULL); while (pIter != NULL) { int32_t groupId = *(int32_t *)pIter; if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 68d966add6..08d32b2b81 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -133,7 +133,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); int32_t tqMetaGetHandle(STQ* pTq, const char* key); -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer); +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); STqOffsetStore* tqOffsetOpen(STQ* pTq); int32_t tqMetaTransform(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 58085e2cc2..083e0b28f3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -649,7 +649,19 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); - STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + STqHandle* pHandle = NULL; + while (1) { + pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + if (pHandle) { + break; + } + taosRLockLatch(&pTq->lock); + ret = tqMetaGetHandle(pTq, req.subKey); + taosRUnLockLatch(&pTq->lock); + if (ret < 0) { + break; + } + } if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, @@ -660,7 +672,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg goto end; } STqHandle handle = {0}; - ret = tqCreateHandle(pTq, &req, &handle, walGetCommittedVer(pTq->pVnode->pWal)); + ret = tqCreateHandle(pTq, &req, &handle); if (ret < 0) { tqDestroyTqHandle(&handle); goto end; @@ -684,17 +696,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); + atomic_store_32(&pHandle->epoch, 0); tqUnregisterPushHandle(pTq, pHandle); - - // update handle to avoid req->qmsg changed if spilt vnode is failed - STqHandle handle = {0}; - ret = tqCreateHandle(pTq, &req, &handle, pHandle->snapshotVer); - if (ret < 0) { - tqDestroyTqHandle(&handle); - taosWUnLockLatch(&pTq->lock); - goto end; - } - ret = tqMetaSaveHandle(pTq, req.subKey, &handle); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } taosWUnLockLatch(&pTq->lock); break; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 5162136591..ce3308f0ac 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -346,7 +346,7 @@ end: return code; } -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer){ +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ int32_t vgId = TD_VID(pTq->pVnode); memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); @@ -364,12 +364,12 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t sn handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); } - handle->snapshotVer = snapshotVer; + handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); if(buildHandle(pTq, handle) < 0){ return -1; } - tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); }