diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b53a601b12..92035101f6 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -110,15 +110,15 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, co int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); -int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); +int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t initStreamNodeList(SMnode *pMnode); -int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated); -int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated); +int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f4ca3dd454..0392f51f51 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1577,21 +1577,6 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) { - taosWLockLatch(&pStream->lock); - pStream->status = status; - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - - taosWUnLockLatch(&pStream->lock); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - return 0; -} - static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; @@ -1647,7 +1632,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid); // if nodeUpdate happened, not send pause trans - if (mndPauseStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1655,12 +1640,18 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } // pause stream - if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__PAUSE) < 0) { + taosWLockLatch(&pStream->lock); + pStream->status = STREAM_STATUS__PAUSE; + if (mndPersistTransLog(pStream, pTrans,SDB_STATUS_READY) < 0) { + taosWUnLockLatch(&pStream->lock); + sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; } + taosWUnLockLatch(&pStream->lock); + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1724,7 +1715,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); // resume all tasks - if (mndResumeStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { + if (mndStreamSetResumeAction(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1732,12 +1723,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // resume stream - if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__NORMAL) < 0) { + taosWLockLatch(&pStream->lock); + pStream->status = STREAM_STATUS__NORMAL; + if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) { + taosWUnLockLatch(&pStream->lock); + sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; } + taosWUnLockLatch(&pStream->lock); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1862,7 +1858,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, pStream->name, pTrans->id); - int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans); + int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 3a70bc9c8b..0a7397827e 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -256,7 +256,7 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) } int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode) { + int32_t retryCode) { STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode}; return mndTransAppendRedoAction(pTrans, &action); } @@ -354,8 +354,8 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha // todo extract method: traverse stream tasks // build trans to update the epset -int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { - mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid); +int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { + mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); taosWLockLatch(&pStream->lock); int32_t numOfLevels = taosArrayGetSize(pStream->tasks); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 74afb4864d..2ee73528e0 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -143,7 +143,7 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t } } -static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { +static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), @@ -200,14 +200,14 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { return num; } -int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { +int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { int32_t size = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(pStream->tasks, i); int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (doResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) { + if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) { return -1; } @@ -219,7 +219,7 @@ int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream return 0; } -static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { +static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), @@ -250,7 +250,7 @@ static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pT return 0; } -int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { +int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { SArray *tasks = pStream->tasks; int32_t size = taosArrayGetSize(tasks); @@ -259,7 +259,8 @@ int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (doPauseStreamTask(pMnode, pTrans, pTask) < 0) { + + if (doSetPauseAction(pMnode, pTrans, pTask) < 0) { return -1; }