From 07e9d56e077a538a95c626e0fb9b9fecf7d41e38 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 25 Oct 2023 11:42:46 +0800 Subject: [PATCH] fix(stream): set the retrycode for checkpoint source transaction. --- source/dnode/mnode/impl/src/mndStream.c | 27 ++++++++++++++----------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a2aa56dd6e..a4f69a1064 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -84,7 +84,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); -static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); +static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, + int32_t retryCode); static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); @@ -516,7 +517,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { STransAction action = {0}; action.mTraceId = pTrans->mTraceId; - initTransAction(&action, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet); + initTransAction(&action, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); return -1; @@ -688,7 +689,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { pReq->streamId = pTask->id.streamId; STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet); + initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1068,7 +1069,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream STransAction action = {0}; SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset); + initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -1646,7 +1647,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { pReq->streamId = pTask->id.streamId; STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet); + initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1779,7 +1780,7 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig pReq->igUntreated = igUntreated; STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet); + initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1959,19 +1960,21 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans) { return 0; } -void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset) { +void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, + int32_t retryCode) { pAction->epSet = *pEpset; pAction->contLen = contLen; pAction->pCont = pCont; pAction->msgType = msgType; + pAction->retryCode = retryCode; } // todo extract method: traverse stream tasks // build trans to update the epset -static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans* pTrans) { - mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid); +static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { + mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid); - taosWLockLatch(&pStream->lock); + taosWLockLatch(&pStream->lock); int32_t numOfLevels = taosArrayGetSize(pStream->tasks); for (int32_t j = 0; j < numOfLevels; ++j) { @@ -1987,7 +1990,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); STransAction action = {0}; - initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet); + initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pBuf); taosWUnLockLatch(&pStream->lock); @@ -2472,7 +2475,7 @@ int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) { pReq->streamId = pTask->id.streamId; STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet); + initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); taosWUnLockLatch(&pStream->lock);