fix:[TD-30365] ci case error & drop topic error if vnode is splitted

This commit is contained in:
wangmm0220 2024-06-11 17:24:29 +08:00
parent 0e337d3418
commit 70db803aec
8 changed files with 67 additions and 63 deletions

View File

@ -102,8 +102,8 @@ typedef enum {
TRN_CONFLICT_GLOBAL = 1,
TRN_CONFLICT_DB = 2,
TRN_CONFLICT_DB_INSIDE = 3,
TRN_CONFLICT_TOPIC = 4,
TRN_CONFLICT_TOPIC_INSIDE = 5,
// TRN_CONFLICT_TOPIC = 4,
// TRN_CONFLICT_TOPIC_INSIDE = 5,
TRN_CONFLICT_ARBGROUP = 6,
} ETrnConflct;

View File

@ -91,7 +91,7 @@ void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SR
}
}
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser,
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser,
bool enableReplay) {
SMqTopicObj *pTopic = NULL;
int32_t code = 0;
@ -135,11 +135,6 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
}
}
mndTransSetDbName(pTrans, pOneTopic, NULL);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
code = -1;
goto FAILED;
}
mndReleaseTopic(pMnode, pTopic);
}
@ -177,12 +172,12 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
goto END;
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
if (pTrans == NULL) {
code = -1;
goto END;
}
code = validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false);
code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false);
if (code != 0) {
goto END;
}
@ -675,13 +670,13 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto _over;
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _over;
}
code = validateTopics(pTrans, subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
code = validateTopics(subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}

View File

@ -618,13 +618,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
char cgroup[TSDB_CGROUP_LEN] = {0};
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb");
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
if (pTrans == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
mndTransSetDbName(pTrans, topic, cgroup);
mndTransSetDbName(pTrans, pOutput->pSub->dbName, cgroup);
code = mndTransCheckConflict(pMnode, pTrans);
if (code != 0) {
goto END;
@ -908,22 +908,26 @@ END:
}
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){
// iter all vnode to delete handle
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId);
if (pVgObj == NULL) {
mError("sendDeleteSubToVnode %s failed since vg %d doesn't exist", pSub->key, pVgEp->vgId);
void* pIter = NULL;
SVgObj* pVgObj = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj);
if (pIter == NULL) {
break;
}
if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
sdbRelease(pMnode->pSdb, pVgObj);
continue;
}
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
if(pReq == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pMnode->pSdb, pVgObj);
return -1;
}
pReq->head.vgId = htonl(pVgEp->vgId);
pReq->vgId = pVgEp->vgId;
pReq->head.vgId = htonl(pVgObj->vgId);
pReq->vgId = pVgObj->vgId;
pReq->consumerId = -1;
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
@ -934,7 +938,7 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
mndReleaseVgroup(pMnode, pVgObj);
sdbRelease(pMnode->pSdb, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
@ -996,7 +1000,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
goto end;
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup");
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup");
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
code = -1;
@ -1004,7 +1008,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
}
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup);
mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup);
code = mndTransCheckConflict(pMnode, pTrans);
if (code != 0) {
goto end;

View File

@ -422,14 +422,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
SQueryPlan *pPlan = NULL;
SMqTopicObj topicObj = {0};
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic");
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic");
if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
code = -1;
goto _OUT;
}
mndTransSetDbName(pTrans, pCreate->name, NULL);
mndTransSetDbName(pTrans, pDb->name, NULL);
code = mndTransCheckConflict(pMnode, pTrans);
if (code != 0) {
goto _OUT;
@ -779,14 +779,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-topic");
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
code = -1;
goto end;
}
mndTransSetDbName(pTrans, pTopic->name, NULL);
mndTransSetDbName(pTrans, pTopic->db, NULL);
code = mndTransCheckConflict(pMnode, pTrans);
if (code != 0) {
goto end;

View File

@ -830,26 +830,26 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
}
}
if (pNew->conflict == TRN_CONFLICT_TOPIC) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
}
}
if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_TOPIC) {
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
}
if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0)
conflict = true;
}
}
// if (pNew->conflict == TRN_CONFLICT_TOPIC) {
// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
// if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
// }
// }
// if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
// if (pTrans->conflict == TRN_CONFLICT_TOPIC) {
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
// }
// if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0)
// conflict = true;
// }
// }
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
void *pIter = taosHashIterate(pNew->arbGroupIds, NULL);
pIter = taosHashIterate(pNew->arbGroupIds, NULL);
while (pIter != NULL) {
int32_t groupId = *(int32_t *)pIter;
if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) {

View File

@ -133,7 +133,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
int32_t tqMetaGetHandle(STQ* pTq, const char* key);
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer);
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
STqOffsetStore* tqOffsetOpen(STQ* pTq);
int32_t tqMetaTransform(STQ* pTq);

View File

@ -649,7 +649,19 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
STqHandle* pHandle = NULL;
while (1) {
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle) {
break;
}
taosRLockLatch(&pTq->lock);
ret = tqMetaGetHandle(pTq, req.subKey);
taosRUnLockLatch(&pTq->lock);
if (ret < 0) {
break;
}
}
if (pHandle == NULL) {
if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
@ -660,7 +672,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
goto end;
}
STqHandle handle = {0};
ret = tqCreateHandle(pTq, &req, &handle, walGetCommittedVer(pTq->pVnode->pWal));
ret = tqCreateHandle(pTq, &req, &handle);
if (ret < 0) {
tqDestroyTqHandle(&handle);
goto end;
@ -684,17 +696,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
tqUnregisterPushHandle(pTq, pHandle);
// update handle to avoid req->qmsg changed if spilt vnode is failed
STqHandle handle = {0};
ret = tqCreateHandle(pTq, &req, &handle, pHandle->snapshotVer);
if (ret < 0) {
tqDestroyTqHandle(&handle);
taosWUnLockLatch(&pTq->lock);
goto end;
}
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
}
taosWUnLockLatch(&pTq->lock);
break;

View File

@ -346,7 +346,7 @@ end:
return code;
}
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer){
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
int32_t vgId = TD_VID(pTq->pVnode);
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
@ -364,12 +364,12 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t sn
handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg);
}
handle->snapshotVer = snapshotVer;
handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
if(buildHandle(pTq, handle) < 0){
return -1;
}
tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}