From 551cd7cdc8e166db682f1f123d6932aa3fdb401d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 14 Jun 2023 21:14:00 +0800 Subject: [PATCH] trigger checkpoint --- source/dnode/mnode/impl/src/mndStream.c | 132 ++++++++---------------- 1 file changed, 42 insertions(+), 90 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7faaa42ffe..236bf9ed6f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -842,70 +842,22 @@ static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamL return 0; } static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - SStreamObj *pStream = NULL; - - // listEleSize(); - - // iterate all stream obj - // SHashObj *vgIds = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) break; - - // taosRLockLatch(&pStream->lock); - // for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { - // SArray *pLevel = taosArrayGetP(pStream->tasks, i); - // SStreamTask *pTask = taosArrayGetP(pLevel, 0); - // if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - // int32_t sz = taosArrayGetSize(pLevel); - // SList *list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId)); - // if (list == NULL) { - // SList tlist; - // tdListInit(&tlist, TSDB_STREAM_FNAME_LEN); - // taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &tlist, sizeof(tlist)); - // list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId)); - // } - // tdListAppend(list, (void *)pStream->name); - // } - // } - // taosRUnLockLatch(&pStream->lock); - - if (pIter == NULL) break; - // incr tick - int64_t currentTick = atomic_add_fetch_64(&pStream->currentTick, 1); - // if >= checkpointFreq, build msg TDMT_MND_STREAM_BEGIN_CHECKPOINT, put into write q - // if (currentTick >= pStream->checkpointFreq) { - atomic_store_64(&pStream->currentTick, 0); - SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - - pMsg->streamId = pStream->uid; - pMsg->checkpointId = tGenIdPI64(); - memcpy(pMsg->streamName, pStream->name, TSDB_STREAM_FNAME_LEN); - - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, - .pCont = pMsg, - .contLen = sizeof(SMStreamDoCheckpointMsg), - }; - - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { + return 0; } - // void *vgIter = taosHashIterate(vgIds, NULL); - // size_t klen = 0; - // int64_t checkpointId = tGenIdPI64(); - // while (vgIter) { - // int32_t *key = (int32_t *)taosHashGetKey(vgIter, &klen); - // SList *val = (SList *)vgIter; - - // mndCreateCheckpoint(pMnode, *key, val); - // vgIter = taosHashIterate(vgIds, vgIter); - // } - // taosHashCleanup(vgIds); + int64_t checkpointId = tGenIdPI64(); + SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); + pMsg->checkpointId = checkpointId; + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, + .pCont = pMsg, + .contLen = sizeof(SMStreamDoCheckpointMsg), + }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } @@ -992,39 +944,24 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in return 0; } - -static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - - SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; - - SStreamObj *pStream = mndAcquireStream(pMnode, pMsg->streamName); - - if (pStream == NULL || pStream->uid != pMsg->streamId) { - mError("failed to checkpoint since stream %s not found", pMsg->streamName); - return -1; - } - - // build new transaction: - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint"); +static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, SHashObj *vgIds, + int64_t checkpointId) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); if (pTrans == NULL) return -1; mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTrancCheckConflict(pMnode, pTrans) != 0) { - mError("failed to checkpoint since stream %s", tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); - - mndReleaseStream(pMnode, pStream); + mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, + tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); mndTransDrop(pTrans); return -1; } - taosRLockLatch(&pStream->lock); // 1. redo action: broadcast checkpoint source msg for all source vg int32_t totLevel = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < totLevel; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *pTask = taosArrayGetP(pLevel, 0); - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && NULL == taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId))) { int32_t sz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); @@ -1032,17 +969,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId); if (pVgObj == NULL) { taosRUnLockLatch(&pStream->lock); - mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); return -1; } void *buf; int32_t tlen; - if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask, pMsg) < 0) { + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->nodeId, checkpointId) < 0) { mndReleaseVgroup(pMnode, pVgObj); taosRUnLockLatch(&pStream->lock); - mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); return -1; } @@ -1062,17 +997,18 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mndTransDrop(pTrans); return -1; } + taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &pTask->nodeId, sizeof(pTask->nodeId)); } } } // 2. reset tick + pStream->checkpointFreq = checkpointId; atomic_store_64(&pStream->currentTick, 0); // 3. commit log: stream checkpoint info - pStream->checkpointFreq = taosGetTimestampMs(); taosRUnLockLatch(&pStream->lock); - // code condtion + // // code condtion SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); if (pCommitRaw == NULL) { @@ -1094,15 +1030,31 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mError("failed to prepare trans rebalance since %s", terrstr()); goto _ERR; } - - mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); return 0; _ERR: - mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); return -1; } +static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SStreamObj *pStream = NULL; + int32_t code = 0; + + SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; + int64_t checkpointId = pMsg->checkpointId; + + SHashObj *vgIds = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + code = mndProcessStreamCheckpointTrans(pMnode, pStream, vgIds, checkpointId); + sdbRelease(pSdb, pStream); + } + return 0; +} static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node;