fix(stream): add task update trans conflict level.

This commit is contained in:
Haojun Liao 2024-04-23 18:07:00 +08:00
parent 10c792efa2
commit 0c7373509e
2 changed files with 8 additions and 4 deletions

View File

@ -98,7 +98,8 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p
mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName);
}
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) {
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;

View File

@ -195,12 +195,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
const char* idstr = pTask->id.idStr;
if (pMeta->updateInfo.transId != req.transId) {
pMeta->updateInfo.transId = req.transId;
tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", idstr, req.transId);
ASSERT(req.transId > pMeta->updateInfo.transId);
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
vgId, req.transId, pMeta->updateInfo.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = req.transId;
} else {
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId);
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
}
// duplicate update epset msg received, discard this redundant message