Merge pull request #22534 from taosdata/mark/tmq
fix:[TD-258900]modify tmq trans conflict to db level
This commit is contained in:
commit
b11975f00c
|
@ -33,7 +33,7 @@ enum {
|
||||||
|
|
||||||
int32_t mndInitConsumer(SMnode *pMnode);
|
int32_t mndInitConsumer(SMnode *pMnode);
|
||||||
void mndCleanupConsumer(SMnode *pMnode);
|
void mndCleanupConsumer(SMnode *pMnode);
|
||||||
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId);
|
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info);
|
||||||
|
|
||||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
|
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
|
||||||
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
|
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
|
||||||
|
|
|
@ -99,6 +99,8 @@ typedef enum {
|
||||||
TRN_CONFLICT_GLOBAL = 1,
|
TRN_CONFLICT_GLOBAL = 1,
|
||||||
TRN_CONFLICT_DB = 2,
|
TRN_CONFLICT_DB = 2,
|
||||||
TRN_CONFLICT_DB_INSIDE = 3,
|
TRN_CONFLICT_DB_INSIDE = 3,
|
||||||
|
TRN_CONFLICT_TOPIC = 4,
|
||||||
|
TRN_CONFLICT_TOPIC_INSIDE = 5,
|
||||||
} ETrnConflct;
|
} ETrnConflct;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -37,7 +37,6 @@ static const char *mndConsumerStatusName(int status);
|
||||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
|
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
|
||||||
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
|
|
||||||
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
@ -45,7 +44,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
|
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
|
|
||||||
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
@ -64,7 +62,6 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
||||||
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
|
||||||
|
|
||||||
|
@ -76,7 +73,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
|
|
||||||
void mndCleanupConsumer(SMnode *pMnode) {}
|
void mndCleanupConsumer(SMnode *pMnode) {}
|
||||||
|
|
||||||
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
|
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info){
|
||||||
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
|
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
|
||||||
if (pClearMsg == NULL) {
|
if (pClearMsg == NULL) {
|
||||||
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
|
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
|
||||||
|
@ -85,7 +82,11 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
|
||||||
|
|
||||||
pClearMsg->consumerId = consumerId;
|
pClearMsg->consumerId = consumerId;
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
|
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
|
||||||
|
.pCont = pClearMsg,
|
||||||
|
.contLen = sizeof(SMqConsumerClearMsg),
|
||||||
|
.info = *info,
|
||||||
|
};
|
||||||
|
|
||||||
mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
|
mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
|
@ -122,48 +123,31 @@ void mndRebCntDec() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
|
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
|
||||||
// SMnode *pMnode = pMsg->info.node;
|
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
||||||
// SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
|
|
||||||
// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
|
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||||
// if (pConsumer == NULL) {
|
char *pOneTopic = taosArrayGetP(pTopicList, i);
|
||||||
// return 0;
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
|
||||||
// }
|
if (pTopic == NULL) { // terrno has been set by callee function
|
||||||
//
|
return -1;
|
||||||
// mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
|
}
|
||||||
// mndConsumerStatusName(pConsumer->status));
|
|
||||||
//
|
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
||||||
// if (pConsumer->status != MQ_CONSUMER_STATUS_READY) {
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
// mndReleaseConsumer(pMnode, pConsumer);
|
return -1;
|
||||||
// return -1;
|
}
|
||||||
// }
|
|
||||||
//
|
mndTransSetDbName(pTrans, pOneTopic, NULL);
|
||||||
// SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
if(mndTransCheckConflict(pMnode, pTrans) != 0){
|
||||||
// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
//
|
return -1;
|
||||||
// mndReleaseConsumer(pMnode, pConsumer);
|
}
|
||||||
//
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
|
}
|
||||||
// if (pTrans == NULL) {
|
|
||||||
// goto FAIL;
|
return 0;
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
|
||||||
// goto FAIL;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (mndTransPrepare(pMnode, pTrans) != 0) {
|
|
||||||
// goto FAIL;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// tDeleteSMqConsumerObj(pConsumerNew, true);
|
|
||||||
// mndTransDrop(pTrans);
|
|
||||||
// return 0;
|
|
||||||
//FAIL:
|
|
||||||
// tDeleteSMqConsumerObj(pConsumerNew, true);
|
|
||||||
// mndTransDrop(pTrans);
|
|
||||||
// return -1;
|
|
||||||
//}
|
|
||||||
|
|
||||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
@ -188,10 +172,13 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||||
|
@ -221,19 +208,12 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
||||||
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
|
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
|
||||||
mndConsumerStatusName(pConsumer->status));
|
mndConsumerStatusName(pConsumer->status));
|
||||||
|
|
||||||
// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
|
|
||||||
// mndReleaseConsumer(pMnode, pConsumer);
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
|
|
||||||
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
||||||
// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
|
|
||||||
|
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
|
||||||
if (pTrans == NULL) goto FAIL;
|
if (pTrans == NULL) goto FAIL;
|
||||||
|
|
||||||
// this is the drop action, not the update action
|
// this is the drop action, not the update action
|
||||||
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||||
|
@ -318,7 +298,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
if (status == MQ_CONSUMER_STATUS_READY) {
|
if (status == MQ_CONSUMER_STATUS_READY) {
|
||||||
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
|
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
|
||||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
|
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
|
||||||
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
|
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
|
||||||
|
@ -333,7 +313,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
} else if (status == MQ_CONSUMER_STATUS_LOST) {
|
} else if (status == MQ_CONSUMER_STATUS_LOST) {
|
||||||
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
|
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
|
||||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
|
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
|
||||||
}
|
}
|
||||||
} else { // MQ_CONSUMER_STATUS_REBALANCE
|
} else { // MQ_CONSUMER_STATUS_REBALANCE
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
|
@ -410,6 +390,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
|
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
|
||||||
.pCont = pRecoverMsg,
|
.pCont = pRecoverMsg,
|
||||||
.contLen = sizeof(SMqConsumerRecoverMsg),
|
.contLen = sizeof(SMqConsumerRecoverMsg),
|
||||||
|
.info = pMsg->info,
|
||||||
};
|
};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
||||||
}
|
}
|
||||||
|
@ -484,6 +465,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
|
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
|
||||||
.pCont = pRecoverMsg,
|
.pCont = pRecoverMsg,
|
||||||
.contLen = sizeof(SMqConsumerRecoverMsg),
|
.contLen = sizeof(SMqConsumerRecoverMsg),
|
||||||
|
.info = pMsg->info,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
||||||
|
@ -629,27 +611,6 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
|
|
||||||
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
|
||||||
char *pOneTopic = taosArrayGetP(pTopicList, i);
|
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
|
|
||||||
if (pTopic == NULL) { // terrno has been set by callee function
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
|
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
|
||||||
|
|
||||||
static void freeItem(void *param) {
|
static void freeItem(void *param) {
|
||||||
|
@ -688,12 +649,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check topic existence
|
// check topic existence
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
|
code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
@ -722,7 +683,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
int32_t status = atomic_load_32(&pExistedConsumer->status);
|
int32_t status = atomic_load_32(&pExistedConsumer->status);
|
||||||
|
|
||||||
|
@ -802,7 +762,6 @@ _over:
|
||||||
|
|
||||||
tDeleteSMqConsumerObj(pConsumerNew, true);
|
tDeleteSMqConsumerObj(pConsumerNew, true);
|
||||||
|
|
||||||
// TODO: replace with destroy subscribe msg
|
|
||||||
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -553,13 +553,17 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
|
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
||||||
|
char cgroup[TSDB_CGROUP_LEN] = {0};
|
||||||
|
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
|
||||||
|
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
|
mndTransSetDbName(pTrans, topic, cgroup);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
|
@ -587,10 +591,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
|
||||||
char cgroup[TSDB_CGROUP_LEN] = {0};
|
|
||||||
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
|
|
||||||
|
|
||||||
// 3. commit log: consumer to update status and epoch
|
// 3. commit log: consumer to update status and epoch
|
||||||
// 3.1 set touched consumer
|
// 3.1 set touched consumer
|
||||||
int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
|
int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
|
||||||
|
@ -802,6 +802,19 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup");
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
||||||
|
code = -1;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup);
|
||||||
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
|
if (code != 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SMqConsumerObj *pConsumer;
|
SMqConsumerObj *pConsumer;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -811,18 +824,11 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) {
|
if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) {
|
||||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
|
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
|
||||||
}
|
}
|
||||||
sdbRelease(pMnode->pSdb, pConsumer);
|
sdbRelease(pMnode->pSdb, pConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
|
|
||||||
if (pTrans == NULL) {
|
|
||||||
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
|
||||||
code = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
||||||
|
|
||||||
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
||||||
|
@ -1019,8 +1025,8 @@ int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN] = {0};
|
||||||
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
|
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
|
||||||
if (strcmp(topic, topicName) != 0) {
|
if (strcmp(topic, topicName) != 0) {
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
|
@ -1084,7 +1090,6 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
|
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
|
||||||
int32_t code = -1;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -1093,8 +1098,8 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
||||||
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
|
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN] = {0};
|
||||||
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
|
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
|
||||||
if (strcmp(topic, topicName) != 0) {
|
if (strcmp(topic, topicName) != 0) {
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
|
@ -1132,15 +1137,13 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
||||||
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
goto END;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
return 0;
|
||||||
END:
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){
|
static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){
|
||||||
|
|
|
@ -381,14 +381,29 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
SQueryPlan *pPlan = NULL;
|
SQueryPlan *pPlan = NULL;
|
||||||
|
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
|
|
||||||
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic");
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
|
code = -1;
|
||||||
|
goto _OUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransSetDbName(pTrans, pCreate->name, NULL);
|
||||||
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
|
if (code != 0) {
|
||||||
|
goto _OUT;
|
||||||
|
}
|
||||||
|
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
|
||||||
|
|
||||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
|
tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
|
||||||
|
|
||||||
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) {
|
code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj);
|
||||||
return -1;
|
if (code != 0) {
|
||||||
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
topicObj.createTime = taosGetTimestampMs();
|
topicObj.createTime = taosGetTimestampMs();
|
||||||
|
@ -405,6 +420,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
if (pCreate->withMeta) {
|
if (pCreate->withMeta) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
|
code = terrno;
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -413,13 +429,15 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
|
|
||||||
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
|
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
|
||||||
|
|
||||||
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
|
code = nodesStringToNode(pCreate->ast, &pAst);
|
||||||
|
if (code != 0) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
||||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
|
||||||
|
if (code != 0) {
|
||||||
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
|
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
@ -427,6 +445,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
||||||
if (topicObj.ntbColIds == NULL) {
|
if (topicObj.ntbColIds == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = terrno;
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -437,12 +456,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
topicObj.ntbColIds = NULL;
|
topicObj.ntbColIds = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
|
code = qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema);
|
||||||
|
if (code != 0) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
|
code = nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL);
|
||||||
|
if (code != 0) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
@ -450,6 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
|
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
|
||||||
if (pStb == NULL) {
|
if (pStb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||||
|
code = terrno;
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -469,21 +491,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
/*topicObj.withTbName = 1;*/
|
/*topicObj.withTbName = 1;*/
|
||||||
/*topicObj.withSchema = 1;*/
|
/*topicObj.withSchema = 1;*/
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic");
|
|
||||||
if (pTrans == NULL) {
|
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
|
||||||
goto _OUT;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
|
||||||
goto _OUT;
|
|
||||||
}
|
|
||||||
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
|
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
||||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
|
code = -1;
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,7 +520,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
|
|
||||||
// encoder check alter info
|
// encoder check alter info
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t code;
|
|
||||||
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
@ -524,6 +534,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
code = -1;
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
@ -538,6 +549,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
code = -1;
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
buf = NULL;
|
buf = NULL;
|
||||||
|
@ -547,6 +559,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
code = -1;
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -654,16 +667,19 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SMDropTopicReq dropReq = {0};
|
SMDropTopicReq dropReq = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
SMqTopicObj *pTopic = NULL;
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
|
||||||
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
|
pTopic = mndAcquireTopic(pMnode, dropReq.name);
|
||||||
if (pTopic == NULL) {
|
if (pTopic == NULL) {
|
||||||
if (dropReq.igNotExists) {
|
if (dropReq.igNotExists) {
|
||||||
mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name);
|
mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name);
|
||||||
|
@ -675,9 +691,29 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic) != 0) {
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic");
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
if (pTrans == NULL) {
|
||||||
return -1;
|
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||||
|
code = -1;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransSetDbName(pTrans, pTopic->name, NULL);
|
||||||
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
|
if (code != 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||||
|
|
||||||
|
code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic);
|
||||||
|
if (code != 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db);
|
||||||
|
if (code != 0) {
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -688,37 +724,42 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){
|
bool found = false;
|
||||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
|
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
||||||
if (strcmp(name, pTopic->name) == 0) {
|
if (strcmp(name, pTopic->name) == 0) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
found = true;
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
break;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
|
||||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (found){
|
||||||
|
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
|
||||||
|
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pReq->info);
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
|
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||||
|
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
|
code = -1;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
sz = taosArrayGetSize(pConsumer->rebNewTopics);
|
sz = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
char *name = taosArrayGetP(pConsumer->rebNewTopics, i);
|
char *name = taosArrayGetP(pConsumer->rebNewTopics, i);
|
||||||
if (strcmp(name, pTopic->name) == 0) {
|
if (strcmp(name, pTopic->name) == 0) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
||||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -727,45 +768,22 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
||||||
if (strcmp(name, pTopic->name) == 0) {
|
if (strcmp(name, pTopic->name) == 0) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
|
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
|
||||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pConsumer);
|
sdbRelease(pSdb, pConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
|
code = mndDropSubByTopic(pMnode, pTrans, dropReq.name);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
if (code < 0) {
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic");
|
|
||||||
if (pTrans == NULL) {
|
|
||||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
goto end;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
|
||||||
|
|
||||||
// TODO check if rebalancing
|
|
||||||
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
|
||||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTopic->ntbUid != 0) {
|
if (pTopic->ntbUid != 0) {
|
||||||
|
@ -791,25 +809,25 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
action.pCont = buf;
|
action.pCont = buf;
|
||||||
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
|
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
|
||||||
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
|
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
code = mndTransAppendRedoAction(pTrans, &action);
|
||||||
|
if (code != 0) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
mndTransDrop(pTrans);
|
goto end;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
||||||
|
|
||||||
|
end:
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
|
@ -792,6 +792,22 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pNew->conflict == TRN_CONFLICT_TOPIC) {
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||||
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC ) {
|
||||||
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
|
||||||
|
}
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||||
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) conflict = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (conflict) {
|
if (conflict) {
|
||||||
mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d",
|
mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d",
|
||||||
pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
|
pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
|
||||||
|
|
|
@ -880,7 +880,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_store_32(&pHandle->epoch, 0);
|
atomic_store_32(&pHandle->epoch, 0);
|
||||||
|
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue