From 8fc8aad1b5b8b5ced999d94f42c8b7a5db564e69 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 23 Aug 2023 16:44:29 +0800 Subject: [PATCH 1/9] fix:[TD-258900]modify tmq trans conflict to db level --- source/dnode/mnode/impl/src/mndConsumer.c | 56 +------- source/dnode/mnode/impl/src/mndSubscribe.c | 17 +-- source/dnode/mnode/impl/src/mndTopic.c | 152 +++++++++++---------- 3 files changed, 87 insertions(+), 138 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 82492f930e..14344b52e7 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -37,7 +37,6 @@ static const char *mndConsumerStatusName(int status); static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer); -static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg); static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); @@ -45,7 +44,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg); static int32_t mndProcessAskEpReq(SRpcMsg *pMsg); static int32_t mndProcessMqHbReq(SRpcMsg *pMsg); static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg); -static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); @@ -64,7 +62,6 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); -// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); @@ -122,49 +119,6 @@ void mndRebCntDec() { } } -//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { -// SMnode *pMnode = pMsg->info.node; -// SMqConsumerLostMsg *pLostMsg = pMsg->pCont; -// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId); -// if (pConsumer == NULL) { -// return 0; -// } -// -// mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status, -// mndConsumerStatusName(pConsumer->status)); -// -// if (pConsumer->status != MQ_CONSUMER_STATUS_READY) { -// mndReleaseConsumer(pMnode, pConsumer); -// return -1; -// } -// -// SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); -// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; -// -// mndReleaseConsumer(pMnode, pConsumer); -// -// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm"); -// if (pTrans == NULL) { -// goto FAIL; -// } -// -// if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { -// goto FAIL; -// } -// -// if (mndTransPrepare(pMnode, pTrans) != 0) { -// goto FAIL; -// } -// -// tDeleteSMqConsumerObj(pConsumerNew, true); -// mndTransDrop(pTrans); -// return 0; -//FAIL: -// tDeleteSMqConsumerObj(pConsumerNew, true); -// mndTransDrop(pTrans); -// return -1; -//} - static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; @@ -221,13 +175,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); -// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { -// mndReleaseConsumer(pMnode, pConsumer); -// return -1; -// } - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); -// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; mndReleaseConsumer(pMnode, pConsumer); @@ -629,7 +577,7 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } -static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) { +static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) { int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { @@ -722,7 +670,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; - } else { int32_t status = atomic_load_32(&pExistedConsumer->status); @@ -802,7 +749,6 @@ _over: tDeleteSMqConsumerObj(pConsumerNew, true); - // TODO: replace with destroy subscribe msg taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); return code; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 53f22f6e60..9579a18fc4 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -553,7 +553,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu } } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pMsg, "tmq-reb"); if (pTrans == NULL) { nodesDestroyNode((SNode*)pPlan); return -1; @@ -1019,8 +1019,8 @@ int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) { if (pIter == NULL) break; - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pSub->key, topic, cgroup, true); if (strcmp(topic, topicName) != 0) { sdbRelease(pSdb, pSub); @@ -1084,7 +1084,6 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { } int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) { - int32_t code = -1; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; @@ -1093,8 +1092,8 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); if (pIter == NULL) break; - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pSub->key, topic, cgroup, true); if (strcmp(topic, topicName) != 0) { sdbRelease(pSdb, pSub); @@ -1132,15 +1131,13 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { sdbRelease(pSdb, pSub); sdbCancelFetch(pSdb, pIter); - goto END; + return -1; } sdbRelease(pSdb, pSub); } - code = 0; -END: - return code; + return 0; } static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){ diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 621a80338d..e7e631fcae 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -381,14 +381,26 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * int32_t code = -1; SNode *pAst = NULL; SQueryPlan *pPlan = NULL; - SMqTopicObj topicObj = {0}; + + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic"); + if (pTrans == NULL) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + goto _OUT; + } + + mndTransSetDbName(pTrans, pDb->name, NULL); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + goto _OUT; + } + mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); + tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN); if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) { - return -1; + goto _OUT; } topicObj.createTime = taosGetTimestampMs(); @@ -469,18 +481,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * /*topicObj.withTbName = 1;*/ /*topicObj.withSchema = 1;*/ - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic"); - if (pTrans == NULL) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - goto _OUT; - } - - mndTransSetDbName(pTrans, pDb->name, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - goto _OUT; - } - mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); - SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); @@ -654,30 +654,55 @@ _OVER: } static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; SMDropTopicReq dropReq = {0}; + int32_t code = 0; + SMqTopicObj *pTopic = NULL; + STrans *pTrans = NULL; if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = -1; + goto end; } - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); + pTopic = mndAcquireTopic(pMnode, dropReq.name); if (pTopic == NULL) { if (dropReq.igNotExists) { mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name); - return 0; + goto end; } else { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; + code = -1; + goto end; } } - if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "drop-topic"); + if (pTrans == NULL) { + mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); + code = -1; + goto end; + } + + mndTransSetDbName(pTrans, pTopic->db, NULL); + code = mndTransCheckConflict(pMnode, pTrans); + if (code != 0) { + goto end; + } + + mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); + + code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic); + if (code != 0) { + goto end; + } + + code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db); + if (code != 0) { + goto end; } void *pIter = NULL; @@ -688,37 +713,41 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { break; } - if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){ - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); - mndReleaseConsumer(pMnode, pConsumer); - continue; - } - + bool found = false; int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->assignedTopics, i); if (strcmp(name, pTopic->name) == 0) { - mndReleaseConsumer(pMnode, pConsumer); - mndReleaseTopic(pMnode, pTopic); - sdbCancelFetch(pSdb, pIter); - terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", - dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - return -1; + found = true; + break; } } + if (found){ + if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + continue; + } + + mndReleaseConsumer(pMnode, pConsumer); + sdbCancelFetch(pSdb, pIter); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", + dropReq.name, pConsumer->consumerId, pConsumer->cgroup); + code = -1; + goto end; + } sz = taosArrayGetSize(pConsumer->rebNewTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->rebNewTopics, i); if (strcmp(name, pTopic->name) == 0) { mndReleaseConsumer(pMnode, pConsumer); - mndReleaseTopic(pMnode, pTopic); sdbCancelFetch(pSdb, pIter); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - return -1; + code = -1; + goto end; } } @@ -727,45 +756,22 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i); if (strcmp(name, pTopic->name) == 0) { mndReleaseConsumer(pMnode, pConsumer); - mndReleaseTopic(pMnode, pTopic); sdbCancelFetch(pSdb, pIter); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - return -1; + code = -1; + goto end; } } sdbRelease(pSdb, pConsumer); } - if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic"); - if (pTrans == NULL) { + code = mndDropSubByTopic(pMnode, pTrans, dropReq.name); + if ( code < 0) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); - mndReleaseTopic(pMnode, pTopic); - return -1; - } - - mndTransSetDbName(pTrans, pTopic->db, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mndReleaseTopic(pMnode, pTopic); - mndTransDrop(pTrans); - return -1; - } - - mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); - - // TODO check if rebalancing - if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { - mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); - mndTransDrop(pTrans); - mndReleaseTopic(pMnode, pTopic); - return -1; + goto end; } if (pTopic->ntbUid != 0) { @@ -791,25 +797,25 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { action.pCont = buf; action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + code = mndTransAppendRedoAction(pTrans, &action); + if (code != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); - mndReleaseTopic(pMnode, pTopic); sdbCancelFetch(pSdb, pIter); - mndTransDrop(pTrans); - return -1; + goto end; } sdbRelease(pSdb, pVgroup); } } - int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic); + code = mndDropTopic(pMnode, pTrans, pReq, pTopic); + +end: mndReleaseTopic(pMnode, pTopic); mndTransDrop(pTrans); - if (code != 0) { mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; + return code; } return TSDB_CODE_ACTION_IN_PROGRESS; From f19c1ea3fe1f9bfd711eabddb009ad6a1fed9103 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 23 Aug 2023 16:46:33 +0800 Subject: [PATCH 2/9] fix:[TD-258900]modify tmq trans conflict to db level --- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 14344b52e7..d5b7342768 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -577,7 +577,7 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } -static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) { +static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) { int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { From 4591f2474617f12cd2c6d6f63eef7f08fd13ca6f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 23 Aug 2023 19:41:02 +0800 Subject: [PATCH 3/9] fix:memory leak --- source/dnode/mnode/impl/src/mndTopic.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e7e631fcae..cb18c0bc65 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -725,6 +725,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (found){ if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndReleaseConsumer(pMnode, pConsumer); continue; } From 8f3956cc673ffca957616055b6929f69ff54c4d3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 23 Aug 2023 16:44:29 +0800 Subject: [PATCH 4/9] fix:[TD-258900]modify tmq trans conflict to db level --- source/dnode/mnode/impl/src/mndConsumer.c | 56 +------- source/dnode/mnode/impl/src/mndSubscribe.c | 17 +-- source/dnode/mnode/impl/src/mndTopic.c | 152 +++++++++++---------- 3 files changed, 87 insertions(+), 138 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 82492f930e..14344b52e7 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -37,7 +37,6 @@ static const char *mndConsumerStatusName(int status); static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer); -static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg); static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); @@ -45,7 +44,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg); static int32_t mndProcessAskEpReq(SRpcMsg *pMsg); static int32_t mndProcessMqHbReq(SRpcMsg *pMsg); static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg); -static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); @@ -64,7 +62,6 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); -// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); @@ -122,49 +119,6 @@ void mndRebCntDec() { } } -//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { -// SMnode *pMnode = pMsg->info.node; -// SMqConsumerLostMsg *pLostMsg = pMsg->pCont; -// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId); -// if (pConsumer == NULL) { -// return 0; -// } -// -// mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status, -// mndConsumerStatusName(pConsumer->status)); -// -// if (pConsumer->status != MQ_CONSUMER_STATUS_READY) { -// mndReleaseConsumer(pMnode, pConsumer); -// return -1; -// } -// -// SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); -// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; -// -// mndReleaseConsumer(pMnode, pConsumer); -// -// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm"); -// if (pTrans == NULL) { -// goto FAIL; -// } -// -// if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { -// goto FAIL; -// } -// -// if (mndTransPrepare(pMnode, pTrans) != 0) { -// goto FAIL; -// } -// -// tDeleteSMqConsumerObj(pConsumerNew, true); -// mndTransDrop(pTrans); -// return 0; -//FAIL: -// tDeleteSMqConsumerObj(pConsumerNew, true); -// mndTransDrop(pTrans); -// return -1; -//} - static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; @@ -221,13 +175,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); -// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { -// mndReleaseConsumer(pMnode, pConsumer); -// return -1; -// } - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); -// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; mndReleaseConsumer(pMnode, pConsumer); @@ -629,7 +577,7 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } -static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) { +static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) { int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { @@ -722,7 +670,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; - } else { int32_t status = atomic_load_32(&pExistedConsumer->status); @@ -802,7 +749,6 @@ _over: tDeleteSMqConsumerObj(pConsumerNew, true); - // TODO: replace with destroy subscribe msg taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); return code; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 53f22f6e60..9579a18fc4 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -553,7 +553,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu } } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pMsg, "tmq-reb"); if (pTrans == NULL) { nodesDestroyNode((SNode*)pPlan); return -1; @@ -1019,8 +1019,8 @@ int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) { if (pIter == NULL) break; - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pSub->key, topic, cgroup, true); if (strcmp(topic, topicName) != 0) { sdbRelease(pSdb, pSub); @@ -1084,7 +1084,6 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { } int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) { - int32_t code = -1; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; @@ -1093,8 +1092,8 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); if (pIter == NULL) break; - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pSub->key, topic, cgroup, true); if (strcmp(topic, topicName) != 0) { sdbRelease(pSdb, pSub); @@ -1132,15 +1131,13 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { sdbRelease(pSdb, pSub); sdbCancelFetch(pSdb, pIter); - goto END; + return -1; } sdbRelease(pSdb, pSub); } - code = 0; -END: - return code; + return 0; } static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){ diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 621a80338d..e7e631fcae 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -381,14 +381,26 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * int32_t code = -1; SNode *pAst = NULL; SQueryPlan *pPlan = NULL; - SMqTopicObj topicObj = {0}; + + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic"); + if (pTrans == NULL) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + goto _OUT; + } + + mndTransSetDbName(pTrans, pDb->name, NULL); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + goto _OUT; + } + mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); + tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN); if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) { - return -1; + goto _OUT; } topicObj.createTime = taosGetTimestampMs(); @@ -469,18 +481,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * /*topicObj.withTbName = 1;*/ /*topicObj.withSchema = 1;*/ - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic"); - if (pTrans == NULL) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - goto _OUT; - } - - mndTransSetDbName(pTrans, pDb->name, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - goto _OUT; - } - mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); - SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); @@ -654,30 +654,55 @@ _OVER: } static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; SMDropTopicReq dropReq = {0}; + int32_t code = 0; + SMqTopicObj *pTopic = NULL; + STrans *pTrans = NULL; if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = -1; + goto end; } - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); + pTopic = mndAcquireTopic(pMnode, dropReq.name); if (pTopic == NULL) { if (dropReq.igNotExists) { mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name); - return 0; + goto end; } else { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; + code = -1; + goto end; } } - if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "drop-topic"); + if (pTrans == NULL) { + mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); + code = -1; + goto end; + } + + mndTransSetDbName(pTrans, pTopic->db, NULL); + code = mndTransCheckConflict(pMnode, pTrans); + if (code != 0) { + goto end; + } + + mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); + + code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic); + if (code != 0) { + goto end; + } + + code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db); + if (code != 0) { + goto end; } void *pIter = NULL; @@ -688,37 +713,41 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { break; } - if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){ - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); - mndReleaseConsumer(pMnode, pConsumer); - continue; - } - + bool found = false; int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->assignedTopics, i); if (strcmp(name, pTopic->name) == 0) { - mndReleaseConsumer(pMnode, pConsumer); - mndReleaseTopic(pMnode, pTopic); - sdbCancelFetch(pSdb, pIter); - terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", - dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - return -1; + found = true; + break; } } + if (found){ + if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + continue; + } + + mndReleaseConsumer(pMnode, pConsumer); + sdbCancelFetch(pSdb, pIter); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", + dropReq.name, pConsumer->consumerId, pConsumer->cgroup); + code = -1; + goto end; + } sz = taosArrayGetSize(pConsumer->rebNewTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->rebNewTopics, i); if (strcmp(name, pTopic->name) == 0) { mndReleaseConsumer(pMnode, pConsumer); - mndReleaseTopic(pMnode, pTopic); sdbCancelFetch(pSdb, pIter); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - return -1; + code = -1; + goto end; } } @@ -727,45 +756,22 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i); if (strcmp(name, pTopic->name) == 0) { mndReleaseConsumer(pMnode, pConsumer); - mndReleaseTopic(pMnode, pTopic); sdbCancelFetch(pSdb, pIter); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - return -1; + code = -1; + goto end; } } sdbRelease(pSdb, pConsumer); } - if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic"); - if (pTrans == NULL) { + code = mndDropSubByTopic(pMnode, pTrans, dropReq.name); + if ( code < 0) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); - mndReleaseTopic(pMnode, pTopic); - return -1; - } - - mndTransSetDbName(pTrans, pTopic->db, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mndReleaseTopic(pMnode, pTopic); - mndTransDrop(pTrans); - return -1; - } - - mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); - - // TODO check if rebalancing - if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { - mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); - mndTransDrop(pTrans); - mndReleaseTopic(pMnode, pTopic); - return -1; + goto end; } if (pTopic->ntbUid != 0) { @@ -791,25 +797,25 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { action.pCont = buf; action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + code = mndTransAppendRedoAction(pTrans, &action); + if (code != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); - mndReleaseTopic(pMnode, pTopic); sdbCancelFetch(pSdb, pIter); - mndTransDrop(pTrans); - return -1; + goto end; } sdbRelease(pSdb, pVgroup); } } - int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic); + code = mndDropTopic(pMnode, pTrans, pReq, pTopic); + +end: mndReleaseTopic(pMnode, pTopic); mndTransDrop(pTrans); - if (code != 0) { mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; + return code; } return TSDB_CODE_ACTION_IN_PROGRESS; From 775f0668c5c57f5497a068b3d8e357096ee371cf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 23 Aug 2023 16:46:33 +0800 Subject: [PATCH 5/9] fix:[TD-258900]modify tmq trans conflict to db level --- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 14344b52e7..d5b7342768 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -577,7 +577,7 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } -static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) { +static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) { int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { From be74370e988fa625f56dacf469e0e408ac3d0a64 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 23 Aug 2023 19:41:02 +0800 Subject: [PATCH 6/9] fix:memory leak --- source/dnode/mnode/impl/src/mndTopic.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e7e631fcae..cb18c0bc65 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -725,6 +725,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (found){ if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndReleaseConsumer(pMnode, pConsumer); continue; } From 49216ffa54f211adafc0567602aeb4a668a67d9a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 16:32:15 +0800 Subject: [PATCH 7/9] fix:transaction in tmq --- source/dnode/mnode/impl/inc/mndDef.h | 2 + source/dnode/mnode/impl/src/mndConsumer.c | 57 ++++++++++++---------- source/dnode/mnode/impl/src/mndSubscribe.c | 32 +++++++----- source/dnode/mnode/impl/src/mndTopic.c | 16 ++++-- source/dnode/mnode/impl/src/mndTrans.c | 16 ++++++ source/dnode/vnode/src/tq/tq.c | 15 +----- 6 files changed, 80 insertions(+), 58 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c83a40c25d..4656e0555f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -99,6 +99,8 @@ typedef enum { TRN_CONFLICT_GLOBAL = 1, TRN_CONFLICT_DB = 2, TRN_CONFLICT_DB_INSIDE = 3, + TRN_CONFLICT_TOPIC = 4, + TRN_CONFLICT_TOPIC_INSIDE = 5, } ETrnConflct; typedef enum { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index d5b7342768..2e78d03884 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -119,6 +119,27 @@ void mndRebCntDec() { } } +static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode) { + int32_t numOfTopics = taosArrayGetSize(pTopicList); + + for (int32_t i = 0; i < numOfTopics; i++) { + char *pOneTopic = taosArrayGetP(pTopicList, i); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic); + if (pTopic == NULL) { // terrno has been set by callee function + return -1; + } + + mndTransSetDbName(pTrans, pOneTopic, NULL); + if(mndTransCheckConflict(pMnode, pTrans) != 0){ + mndReleaseTopic(pMnode, pTopic); + return -1; + } + mndReleaseTopic(pMnode, pTopic); + } + + return 0; +} + static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; @@ -142,10 +163,13 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { mndReleaseConsumer(pMnode, pConsumer); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm"); if (pTrans == NULL) { goto FAIL; } + if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode) != 0){ + goto FAIL; + } if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; @@ -179,9 +203,11 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mndReleaseConsumer(pMnode, pConsumer); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pMsg, "clear-csm"); if (pTrans == NULL) goto FAIL; - + if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode) != 0){ + goto FAIL; + } // this is the drop action, not the update action if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; @@ -577,27 +603,6 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } -static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) { - int32_t numOfTopics = taosArrayGetSize(pTopicList); - - for (int32_t i = 0; i < numOfTopics; i++) { - char *pOneTopic = taosArrayGetP(pTopicList, i); - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic); - if (pTopic == NULL) { // terrno has been set by callee function - return -1; - } - - if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; - } - - mndReleaseTopic(pMnode, pTopic); - } - - return 0; -} - static void *topicNameDup(void *p) { return taosStrdup((char *)p); } static void freeItem(void *param) { @@ -636,12 +641,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // check topic existence - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; } - code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user); + code = validateTopics(pTrans, pTopicList, pMnode); if (code != TSDB_CODE_SUCCESS) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 9579a18fc4..b4145ae8d0 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -553,13 +553,17 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu } } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pMsg, "tmq-reb"); + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { nodesDestroyNode((SNode*)pPlan); return -1; } - mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); + mndTransSetDbName(pTrans, topic, cgroup); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mndTransDrop(pTrans); nodesDestroyNode((SNode*)pPlan); @@ -587,10 +591,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu return -1; } - char topic[TSDB_TOPIC_FNAME_LEN] = {0}; - char cgroup[TSDB_CGROUP_LEN] = {0}; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - // 3. commit log: consumer to update status and epoch // 3.1 set touched consumer int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers); @@ -802,6 +802,19 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup"); + if (pTrans == NULL) { + mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); + code = -1; + goto end; + } + + mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup); + code = mndTransCheckConflict(pMnode, pTrans); + if (code != 0) { + goto end; + } + void *pIter = NULL; SMqConsumerObj *pConsumer; while (1) { @@ -816,13 +829,6 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { sdbRelease(pMnode->pSdb, pConsumer); } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup"); - if (pTrans == NULL) { - mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); - code = -1; - goto end; - } - mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index cb18c0bc65..13c9b7d176 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -383,14 +383,15 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * SQueryPlan *pPlan = NULL; SMqTopicObj topicObj = {0}; - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic"); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); goto _OUT; } - mndTransSetDbName(pTrans, pDb->name, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { + mndTransSetDbName(pTrans, pCreate->name, NULL); + code = mndTransCheckConflict(pMnode, pTrans); + if (code != 0) { goto _OUT; } mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); @@ -661,6 +662,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { SMqTopicObj *pTopic = NULL; STrans *pTrans = NULL; + if (!mndRebTryStart()) { + mInfo("mq rebalance already in progress, do nothing"); + return 0; + } + if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; code = -1; @@ -680,14 +686,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "drop-topic"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic"); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); code = -1; goto end; } - mndTransSetDbName(pTrans, pTopic->db, NULL); + mndTransSetDbName(pTrans, pTopic->name, NULL); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto end; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7ebaf6dda5..849b14255c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -792,6 +792,22 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { } } + if (pNew->conflict == TRN_CONFLICT_TOPIC) { + if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; + if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { + if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true; + } + } + if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) { + if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; + if (pTrans->conflict == TRN_CONFLICT_TOPIC ) { + if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true; + } + if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { + if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) conflict = true; + } + } + if (conflict) { mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d", pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 815e9647b5..3396803f08 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -879,20 +879,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); - // atomic_add_fetch_32(&pHandle->epoch, 1); - - // kill executing task - // if(tqIsHandleExec(pHandle)) { - // qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - // if (pTaskInfo != NULL) { - // qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - // } - - // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - // qStreamCloseTsdbReader(pTaskInfo); - // } - // } - // remove if it has been register in the push manager, and return one empty block to consumer + atomic_store_32(&pHandle->epoch, 0); tqUnregisterPushHandle(pTq, pHandle); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } From cca00406a55965c86aea61d2d3061b0a58265e67 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 18:16:57 +0800 Subject: [PATCH 8/9] fix:drop topic error if topic not exist --- source/dnode/mnode/impl/src/mndTopic.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index a3f1a100d0..eacdaa1665 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -664,20 +664,18 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - code = -1; - goto end; + return -1; } pTopic = mndAcquireTopic(pMnode, dropReq.name); if (pTopic == NULL) { if (dropReq.igNotExists) { mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name); - goto end; + return 0; } else { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - code = -1; - goto end; + return -1; } } From 8b745de397575003717aa5bd109d6b408aabf076 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 26 Aug 2023 00:43:47 +0800 Subject: [PATCH 9/9] fix:transaction error --- source/dnode/mnode/impl/src/mndConsumer.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index e83235fd74..f25bd2cffb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -212,11 +212,8 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mndReleaseConsumer(pMnode, pConsumer); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pMsg, "clear-csm"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); if (pTrans == NULL) goto FAIL; - if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){ - goto FAIL; - } // this is the drop action, not the update action if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;