From e588640e02175580e118e6820c34d6a08ebe42f9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 13 Jul 2023 20:46:58 +0800 Subject: [PATCH] fix recover error --- source/dnode/mnode/impl/src/mndStream.c | 105 ++++++++++++++++++++++-- 1 file changed, 99 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8ce9ff6def..568ca530a5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1009,7 +1009,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre /*A(pTask->info.nodeId > 0);*/ SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); if (pVgObj == NULL) { - taosRUnLockLatch(&pStream->lock); + taosWUnLockLatch(&pStream->lock); mndTransDrop(pTrans); return -1; } @@ -1019,7 +1019,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, pTask->id.taskId) < 0) { mndReleaseVgroup(pMnode, pVgObj); - taosRUnLockLatch(&pStream->lock); + taosWUnLockLatch(&pStream->lock); mndTransDrop(pTrans); return -1; } @@ -1034,7 +1034,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); - taosRUnLockLatch(&pStream->lock); + taosWUnLockLatch(&pStream->lock); mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); return -1; @@ -1079,6 +1079,78 @@ _ERR: mndTransDrop(pTrans); return -1; } + +static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, + int64_t checkpointId) { + taosWLockLatch(&pStream->lock); + + int32_t totLevel = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < totLevel; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SStreamTask *pTask = taosArrayGetP(pLevel, 0); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t sz = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + /*A(pTask->info.nodeId > 0);*/ + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + if (pVgObj == NULL) { + taosWUnLockLatch(&pStream->lock); + return -1; + } + + void *buf; + int32_t tlen; + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, + pTask->id.taskId) < 0) { + mndReleaseVgroup(pMnode, pVgObj); + taosWUnLockLatch(&pStream->lock); + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; + + mndReleaseVgroup(pMnode, pVgObj); + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(buf); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + } + } + + pStream->checkpointFreq = checkpointId; + pStream->checkpointId = checkpointId; + pStream->checkpointFreq = taosGetTimestampMs(); + atomic_store_64(&pStream->currentTick, 0); + // 3. commit log: stream checkpoint info + pStream->version = pStream->version + 1; + + taosWUnLockLatch(&pStream->lock); + + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL) { + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + return 0; +} static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1089,16 +1161,37 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); + if (pTrans == NULL) { + mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; + } + mndTransSetDbName(pTrans, "checkpoint", "checkpoint"); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, + tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); + mndTransDrop(pTrans); + return -1; + } + while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId); + + code = mndAddStreamCheckpointToTrans(pTrans, pStream, pMnode, checkpointId); if (code == -1) { - mInfo("stream:%s failed to do checkpoint, reason: last checkpoint not finished", pStream->name); + sdbRelease(pSdb, pStream); + break; } sdbRelease(pSdb, pStream); } - return 0; + if (code == 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("failed to prepre trans rebalance since %s", terrstr()); + } + } + mndTransDrop(pTrans); + return code; } static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {