From 64eee4bbd2f799f1cb843d0b944fbbeef9863d77 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Dec 2023 19:06:32 +0800 Subject: [PATCH] fix(stream): add conflict check for nodeUpdate --- source/dnode/mnode/impl/src/mndStream.c | 30 ++++++++++++++++---- source/dnode/mnode/impl/src/mndStreamTrans.c | 4 +-- source/libs/stream/src/streamStart.c | 2 +- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b8ecf66686..0e593a5109 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2330,12 +2330,28 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { } static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { - SSdb *pSdb = pMnode->pSdb; - - // check all streams that involved this vnode should update the epset info + SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void * pIter = NULL; - STrans * pTrans = NULL; + void *pIter = NULL; + STrans *pTrans = NULL; + + // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool + while(1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, true); + sdbRelease(pSdb, pStream); + + if (conflict) { + mWarn("nodeUpdate trans in progress, current nodeUpdate ignored"); + sdbCancelFetch(pSdb, pIter); + return -1; + } + } + while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -2351,6 +2367,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange sdbCancelFetch(pSdb, pIter); return terrno; } + + mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); } void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); @@ -2597,7 +2615,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { code = mndProcessVgroupChange(pMnode, &changeInfo); - // keep the new vnode snapshot + // keep the new vnode snapshot if success if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("create trans successfully, update cached node list"); taosArrayDestroy(execInfo.pNodeList); diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 0db3cb0a8a..a7d4703c8c 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -94,8 +94,8 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char tInfo.name); return true; } - } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || - (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0)) { + } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) || + (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) { mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, tInfo.name); return true; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 94f635a480..d94aa80369 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -73,7 +73,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { taosThreadMutexUnlock(&pTask->lock); - // to avoid deadlock + // todo: fix it, to avoid deadlock in: tqStreamTaskProcessUpdateReq streamMetaUpdateTaskDownstreamStatus(pMeta, id.streamId, id.taskId, initTs, startTs, true); taosThreadMutexLock(&pTask->lock);