fix:[TD-258900]modify tmq trans conflict to db level
This commit is contained in:
parent
d5c4d3888e
commit
8f3956cc67
|
@ -37,7 +37,6 @@ static const char *mndConsumerStatusName(int status);
|
|||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
|
||||
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
|
||||
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
||||
|
||||
|
@ -45,7 +44,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
|
|||
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
||||
|
||||
|
@ -64,7 +62,6 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
||||
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
|
||||
|
||||
|
@ -122,49 +119,6 @@ void mndRebCntDec() {
|
|||
}
|
||||
}
|
||||
|
||||
//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
|
||||
// SMnode *pMnode = pMsg->info.node;
|
||||
// SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
|
||||
// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
|
||||
// if (pConsumer == NULL) {
|
||||
// return 0;
|
||||
// }
|
||||
//
|
||||
// mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
|
||||
// mndConsumerStatusName(pConsumer->status));
|
||||
//
|
||||
// if (pConsumer->status != MQ_CONSUMER_STATUS_READY) {
|
||||
// mndReleaseConsumer(pMnode, pConsumer);
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
||||
// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
|
||||
//
|
||||
// mndReleaseConsumer(pMnode, pConsumer);
|
||||
//
|
||||
// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
|
||||
// if (pTrans == NULL) {
|
||||
// goto FAIL;
|
||||
// }
|
||||
//
|
||||
// if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
||||
// goto FAIL;
|
||||
// }
|
||||
//
|
||||
// if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
// goto FAIL;
|
||||
// }
|
||||
//
|
||||
// tDeleteSMqConsumerObj(pConsumerNew, true);
|
||||
// mndTransDrop(pTrans);
|
||||
// return 0;
|
||||
//FAIL:
|
||||
// tDeleteSMqConsumerObj(pConsumerNew, true);
|
||||
// mndTransDrop(pTrans);
|
||||
// return -1;
|
||||
//}
|
||||
|
||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
|
||||
|
@ -221,13 +175,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
|||
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
|
||||
mndConsumerStatusName(pConsumer->status));
|
||||
|
||||
// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
|
||||
// mndReleaseConsumer(pMnode, pConsumer);
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
||||
// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
|
||||
|
@ -629,7 +577,7 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
|
||||
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
|
||||
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
||||
|
||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||
|
@ -722,7 +670,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
||||
|
||||
} else {
|
||||
int32_t status = atomic_load_32(&pExistedConsumer->status);
|
||||
|
||||
|
@ -802,7 +749,6 @@ _over:
|
|||
|
||||
tDeleteSMqConsumerObj(pConsumerNew, true);
|
||||
|
||||
// TODO: replace with destroy subscribe msg
|
||||
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -553,7 +553,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
|||
}
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pMsg, "tmq-reb");
|
||||
if (pTrans == NULL) {
|
||||
nodesDestroyNode((SNode*)pPlan);
|
||||
return -1;
|
||||
|
@ -1019,8 +1019,8 @@ int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
|
|||
if (pIter == NULL) break;
|
||||
|
||||
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
||||
char cgroup[TSDB_CGROUP_LEN] = {0};
|
||||
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
|
||||
if (strcmp(topic, topicName) != 0) {
|
||||
sdbRelease(pSdb, pSub);
|
||||
|
@ -1084,7 +1084,6 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|||
}
|
||||
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
|
||||
int32_t code = -1;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
void *pIter = NULL;
|
||||
|
@ -1093,8 +1092,8 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
|||
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
||||
char cgroup[TSDB_CGROUP_LEN] = {0};
|
||||
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
|
||||
if (strcmp(topic, topicName) != 0) {
|
||||
sdbRelease(pSdb, pSub);
|
||||
|
@ -1132,15 +1131,13 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
|||
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
||||
sdbRelease(pSdb, pSub);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
goto END;
|
||||
return -1;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pSub);
|
||||
}
|
||||
|
||||
code = 0;
|
||||
END:
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){
|
||||
|
|
|
@ -381,14 +381,26 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
int32_t code = -1;
|
||||
SNode *pAst = NULL;
|
||||
SQueryPlan *pPlan = NULL;
|
||||
|
||||
SMqTopicObj topicObj = {0};
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic");
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
goto _OUT;
|
||||
}
|
||||
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
|
||||
|
||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
|
||||
|
||||
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) {
|
||||
return -1;
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
topicObj.createTime = taosGetTimestampMs();
|
||||
|
@ -469,18 +481,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
/*topicObj.withTbName = 1;*/
|
||||
/*topicObj.withSchema = 1;*/
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic");
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
goto _OUT;
|
||||
}
|
||||
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
|
||||
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
|
@ -654,30 +654,55 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMDropTopicReq dropReq = {0};
|
||||
int32_t code = 0;
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
|
||||
pTopic = mndAcquireTopic(pMnode, dropReq.name);
|
||||
if (pTopic == NULL) {
|
||||
if (dropReq.igNotExists) {
|
||||
mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name);
|
||||
return 0;
|
||||
goto end;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
||||
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||
return -1;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic) != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "drop-topic");
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
||||
code = mndTransCheckConflict(pMnode, pTrans);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||
|
||||
code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
void *pIter = NULL;
|
||||
|
@ -688,37 +713,41 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
break;
|
||||
}
|
||||
|
||||
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){
|
||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
continue;
|
||||
}
|
||||
|
||||
bool found = false;
|
||||
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
||||
if (strcmp(name, pTopic->name) == 0) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||
return -1;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found){
|
||||
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
|
||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
|
||||
continue;
|
||||
}
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->rebNewTopics, i);
|
||||
if (strcmp(name, pTopic->name) == 0) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||
return -1;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -727,45 +756,22 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
||||
if (strcmp(name, pTopic->name) == 0) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
|
||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||
return -1;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
}
|
||||
|
||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic");
|
||||
if (pTrans == NULL) {
|
||||
code = mndDropSubByTopic(pMnode, pTrans, dropReq.name);
|
||||
if ( code < 0) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||
|
||||
// TODO check if rebalancing
|
||||
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (pTopic->ntbUid != 0) {
|
||||
|
@ -791,25 +797,25 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
action.pCont = buf;
|
||||
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
|
||||
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
code = mndTransAppendRedoAction(pTrans, &action);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(buf);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
goto end;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
||||
code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
||||
|
||||
end:
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
if (code != 0) {
|
||||
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
|
Loading…
Reference in New Issue