Merge pull request #26980 from taosdata/fix/TD-31115

fix:[TD-31115] modify trans conflict level in tmq
This commit is contained in:
dapan1121 2024-08-06 14:11:47 +08:00 committed by GitHub
commit eda09d700a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 12 deletions

View File

@ -87,19 +87,18 @@ END:
return code; return code;
} }
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser, static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) {
bool enableReplay) {
SMqTopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
int32_t code = 0; int32_t code = 0;
int32_t numOfTopics = taosArrayGetSize(pTopicList); int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
for (int32_t i = 0; i < numOfTopics; i++) { for (int32_t i = 0; i < numOfTopics; i++) {
char *pOneTopic = taosArrayGetP(pTopicList, i); char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic)); MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic)); MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic));
MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION)); MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
if (enableReplay) { if (subscribe->enableReplay) {
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT; code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
goto END; goto END;
@ -117,6 +116,10 @@ static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const ch
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
} }
} }
char key[TSDB_CONSUMER_ID_LEN] = {0};
(void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
mndTransSetDbName(pTrans, pTopic->db, key);
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
} }
return 0; return 0;
@ -128,7 +131,6 @@ END:
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqConsumerClearMsg *pClearMsg = pMsg->pCont; SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
SMqConsumerObj *pConsumerNew = NULL; SMqConsumerObj *pConsumerNew = NULL;
@ -563,16 +565,19 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SCMSubscribeReq subscribe = {0}; SCMSubscribeReq subscribe = {0};
MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen)); MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
if(taosArrayGetSize(subscribe.topicNames) == 0){ bool ubSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
if(ubSubscribe){
SMqConsumerObj *pConsumerTmp = NULL; SMqConsumerObj *pConsumerTmp = NULL;
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp)); MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
mndReleaseConsumer(pMnode, pConsumerTmp); mndReleaseConsumer(pMnode, pConsumerTmp);
} }
MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames)); MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
(ubSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
pMsg, "subscribe");
MND_TMQ_NULL_CHECK(pTrans); MND_TMQ_NULL_CHECK(pTrans);
MND_TMQ_RETURN_CHECK(validateTopics(subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay)); MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user));
MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew)); MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew)); MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans)); MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));

View File

@ -650,7 +650,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
goto END; goto END;
} }
mndTransSetDbName(pTrans, pOutput->pSub->dbName, cgroup); mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
// 1. redo action: action to all vg // 1. redo action: action to all vg
@ -1079,10 +1079,10 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
goto END; goto END;
} }
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup"); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
MND_TMQ_NULL_CHECK(pTrans); MND_TMQ_NULL_CHECK(pTrans);
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup); mndTransSetDbName(pTrans, pSub->dbName, NULL);
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans)); MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub)); MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));