fix(stream): add conflict check for nodeUpdate
This commit is contained in:
parent
849aaf8d4f
commit
64eee4bbd2
|
@ -2331,11 +2331,27 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
|||
|
||||
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
// check all streams that involved this vnode should update the epset info
|
||||
SStreamObj *pStream = NULL;
|
||||
void * pIter = NULL;
|
||||
STrans * pTrans = NULL;
|
||||
void *pIter = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
// conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
|
||||
while(1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, true);
|
||||
sdbRelease(pSdb, pStream);
|
||||
|
||||
if (conflict) {
|
||||
mWarn("nodeUpdate trans in progress, current nodeUpdate ignored");
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
|
@ -2351,6 +2367,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
|
||||
}
|
||||
|
||||
void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
|
||||
|
@ -2597,7 +2615,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
|
||||
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
||||
|
||||
// keep the new vnode snapshot
|
||||
// keep the new vnode snapshot if success
|
||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mDebug("create trans successfully, update cached node list");
|
||||
taosArrayDestroy(execInfo.pNodeList);
|
||||
|
|
|
@ -94,8 +94,8 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char
|
|||
tInfo.name);
|
||||
return true;
|
||||
}
|
||||
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) ||
|
||||
(strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0)) {
|
||||
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
|
||||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) {
|
||||
mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
|
||||
tInfo.name);
|
||||
return true;
|
||||
|
|
|
@ -73,7 +73,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
|
|||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
// to avoid deadlock
|
||||
// todo: fix it, to avoid deadlock in: tqStreamTaskProcessUpdateReq
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, id.streamId, id.taskId, initTs, startTs, true);
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
|
Loading…
Reference in New Issue