fix:transaction in tmq
This commit is contained in:
parent
be74370e98
commit
49216ffa54
|
@ -99,6 +99,8 @@ typedef enum {
|
||||||
TRN_CONFLICT_GLOBAL = 1,
|
TRN_CONFLICT_GLOBAL = 1,
|
||||||
TRN_CONFLICT_DB = 2,
|
TRN_CONFLICT_DB = 2,
|
||||||
TRN_CONFLICT_DB_INSIDE = 3,
|
TRN_CONFLICT_DB_INSIDE = 3,
|
||||||
|
TRN_CONFLICT_TOPIC = 4,
|
||||||
|
TRN_CONFLICT_TOPIC_INSIDE = 5,
|
||||||
} ETrnConflct;
|
} ETrnConflct;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -119,6 +119,27 @@ void mndRebCntDec() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode) {
|
||||||
|
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||||
|
char *pOneTopic = taosArrayGetP(pTopicList, i);
|
||||||
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
|
||||||
|
if (pTopic == NULL) { // terrno has been set by callee function
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransSetDbName(pTrans, pOneTopic, NULL);
|
||||||
|
if(mndTransCheckConflict(pMnode, pTrans) != 0){
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
|
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
|
||||||
|
@ -142,10 +163,13 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode) != 0){
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||||
|
@ -179,9 +203,11 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pMsg, "clear-csm");
|
||||||
if (pTrans == NULL) goto FAIL;
|
if (pTrans == NULL) goto FAIL;
|
||||||
|
if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode) != 0){
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
// this is the drop action, not the update action
|
// this is the drop action, not the update action
|
||||||
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||||
|
@ -577,27 +603,6 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
|
|
||||||
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
|
||||||
char *pOneTopic = taosArrayGetP(pTopicList, i);
|
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
|
|
||||||
if (pTopic == NULL) { // terrno has been set by callee function
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
|
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
|
||||||
|
|
||||||
static void freeItem(void *param) {
|
static void freeItem(void *param) {
|
||||||
|
@ -636,12 +641,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check topic existence
|
// check topic existence
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
|
code = validateTopics(pTrans, pTopicList, pMnode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
|
@ -553,13 +553,17 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pMsg, "tmq-reb");
|
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
||||||
|
char cgroup[TSDB_CGROUP_LEN] = {0};
|
||||||
|
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
|
||||||
|
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
|
mndTransSetDbName(pTrans, topic, cgroup);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
|
@ -587,10 +591,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
|
||||||
char cgroup[TSDB_CGROUP_LEN] = {0};
|
|
||||||
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
|
|
||||||
|
|
||||||
// 3. commit log: consumer to update status and epoch
|
// 3. commit log: consumer to update status and epoch
|
||||||
// 3.1 set touched consumer
|
// 3.1 set touched consumer
|
||||||
int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
|
int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
|
||||||
|
@ -802,6 +802,19 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup");
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
||||||
|
code = -1;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup);
|
||||||
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
|
if (code != 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SMqConsumerObj *pConsumer;
|
SMqConsumerObj *pConsumer;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -816,13 +829,6 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
sdbRelease(pMnode->pSdb, pConsumer);
|
sdbRelease(pMnode->pSdb, pConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
|
|
||||||
if (pTrans == NULL) {
|
|
||||||
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
|
||||||
code = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
||||||
|
|
||||||
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
||||||
|
|
|
@ -383,14 +383,15 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
SQueryPlan *pPlan = NULL;
|
SQueryPlan *pPlan = NULL;
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
mndTransSetDbName(pTrans, pCreate->name, NULL);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
|
if (code != 0) {
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
|
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
|
||||||
|
@ -661,6 +662,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
STrans *pTrans = NULL;
|
STrans *pTrans = NULL;
|
||||||
|
|
||||||
|
if (!mndRebTryStart()) {
|
||||||
|
mInfo("mq rebalance already in progress, do nothing");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -680,14 +686,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "drop-topic");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||||
code = -1;
|
code = -1;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
mndTransSetDbName(pTrans, pTopic->name, NULL);
|
||||||
code = mndTransCheckConflict(pMnode, pTrans);
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto end;
|
goto end;
|
||||||
|
|
|
@ -792,6 +792,22 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pNew->conflict == TRN_CONFLICT_TOPIC) {
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||||
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC ) {
|
||||||
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
|
||||||
|
}
|
||||||
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||||
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) conflict = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (conflict) {
|
if (conflict) {
|
||||||
mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d",
|
mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d",
|
||||||
pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
|
pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
|
||||||
|
|
|
@ -879,20 +879,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
// atomic_add_fetch_32(&pHandle->epoch, 1);
|
atomic_store_32(&pHandle->epoch, 0);
|
||||||
|
|
||||||
// kill executing task
|
|
||||||
// if(tqIsHandleExec(pHandle)) {
|
|
||||||
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
|
||||||
// if (pTaskInfo != NULL) {
|
|
||||||
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
|
||||||
// qStreamCloseTsdbReader(pTaskInfo);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue