diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5d41e1506c..0a107518df 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1931,11 +1931,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange sdbCancelFetch(pSdb, pIter); return terrno = code; } - - code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); - if (code) { - mError("failed to register trans, transId:%d, and continue", pTrans->id); - } } if (!includeAllNodes) { @@ -1951,6 +1946,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, pStream->name, pTrans->id); + // NOTE: for each stream, we register one trans entry for task update + code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); + if (code) { + mError("failed to register trans, transId:%d, and continue", pTrans->id); + } + code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 905a73ad48..a1f87f4d72 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -35,7 +35,12 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) size_t keyLen = 0; void *pIter = NULL; SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); - int32_t num = 0; + int32_t numOfChkpt = 0; + int32_t numOfTaskUpdate = 0; + + if (pNumOfActiveChkpt != NULL) { + *pNumOfActiveChkpt = 0; + } if (pList == NULL) { return terrno; @@ -50,15 +55,15 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) void *pKey = taosHashGetKey(pEntry, &keyLen); // key is the name of src/dst db name SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; - mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, - pEntry->startTime); + mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId, + pEntry->streamId, pEntry->name, pEntry->startTime); void* p = taosArrayPush(pList, &info); if (p == NULL) { return terrno; } } else { if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) { - num++; + numOfChkpt++; } mndReleaseTrans(pMnode, pTrans); } @@ -78,48 +83,34 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) } } - mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size, - taosHashGetSize(execInfo.transMgmt.pDBTrans), num); + mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d, update trans:%d", size, + taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt, numOfTaskUpdate); taosArrayDestroy(pList); if (pNumOfActiveChkpt != NULL) { - *pNumOfActiveChkpt = num; + *pNumOfActiveChkpt = numOfChkpt; } return 0; } -// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream. -// For a given stream: -// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. -// 2. create/drop/reset/update trans are conflict with any other trans. -int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { - if (lock) { - streamMutexLock(&execInfo.lock); - } - +static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) { int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { - if (lock) { - streamMutexUnlock(&execInfo.lock); - } return 0; } + // if any task updates exist, any other stream trans are not allowed to be created int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); if (code) { - mError("failed to clear finish trans, code:%s", tstrerror(code)); + mError("failed to clear finish trans, code:%s, and continue", tstrerror(code)); } SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId)); if (pEntry != NULL) { SStreamTransInfo tInfo = *pEntry; - if (lock) { - streamMutexUnlock(&execInfo.lock); - } - if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) && (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) { @@ -141,11 +132,25 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId); } + return TSDB_CODE_SUCCESS; +} + +// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream. +// For a given stream: +// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. +// 2. create/drop/reset/update trans are conflict with any other trans. +int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { + if (lock) { + streamMutexLock(&execInfo.lock); + } + + int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName); + if (lock) { streamMutexUnlock(&execInfo.lock); } - return 0; + return code; } int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {