From 67dc52d2d624107e24d97e6657dc8e050124ea2e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Oct 2023 15:19:52 +0800 Subject: [PATCH] refactor stream checkpoint --- source/dnode/mnode/impl/src/mndStream.c | 225 ++++++++++-------------- 1 file changed, 97 insertions(+), 128 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 049b4e737a..fab3a0f414 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -886,7 +886,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SName name = {0}; tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB); - //reuse this function for stream + // reuse this function for stream auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, "", detail); @@ -961,107 +961,102 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in return 0; } -// static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) { -// int64_t timestampMs = taosGetTimestampMs(); -// if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) { -// return -1; -// } +static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) { + int32_t code = -1; + int64_t timestampMs = taosGetTimestampMs(); + if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) { + return -1; + } -// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); -// if (pTrans == NULL) return -1; -// mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); -// if (mndTrancCheckConflict(pMnode, pTrans) != 0) { -// mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, -// checkpointId, -// tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); -// mndTransDrop(pTrans); -// return -1; -// } -// mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId); -// atomic_store_64(&pStream->currentTick, 1); -// taosWLockLatch(&pStream->lock); -// // 1. redo action: broadcast checkpoint source msg for all source vg -// int32_t totLevel = taosArrayGetSize(pStream->tasks); -// for (int32_t i = 0; i < totLevel; i++) { -// SArray *pLevel = taosArrayGetP(pStream->tasks, i); -// SStreamTask *pTask = taosArrayGetP(pLevel, 0); -// if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { -// int32_t sz = taosArrayGetSize(pLevel); -// for (int32_t j = 0; j < sz; j++) { -// SStreamTask *pTask = taosArrayGetP(pLevel, j); -// /*A(pTask->info.nodeId > 0);*/ -// SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); -// if (pVgObj == NULL) { -// taosWUnLockLatch(&pStream->lock); -// mndTransDrop(pTrans); -// return -1; -// } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); + if (pTrans == NULL) return -1; -// void *buf; -// int32_t tlen; -// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, -// pTask->id.taskId) < 0) { -// mndReleaseVgroup(pMnode, pVgObj); -// taosWUnLockLatch(&pStream->lock); -// mndTransDrop(pTrans); -// return -1; -// } + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, + tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); + goto _ERR; + } -// STransAction action = {0}; -// action.epSet = mndGetVgroupEpset(pMnode, pVgObj); -// action.pCont = buf; -// action.contLen = tlen; -// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; + mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId); + taosWLockLatch(&pStream->lock); + pStream->currentTick = 1; + // 1. redo action: broadcast checkpoint source msg for all source vg -// mndReleaseVgroup(pMnode, pVgObj); + int32_t totLevel = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < totLevel; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SStreamTask *pTask = taosArrayGetP(pLevel, 0); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t sz = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + /*A(pTask->info.nodeId > 0);*/ + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + if (pVgObj == NULL) { + taosWUnLockLatch(&pStream->lock); + goto _ERR; + } -// if (mndTransAppendRedoAction(pTrans, &action) != 0) { -// taosMemoryFree(buf); -// taosWUnLockLatch(&pStream->lock); -// mndReleaseStream(pMnode, pStream); -// mndTransDrop(pTrans); -// return -1; -// } -// } -// } -// } -// // 2. reset tick -// pStream->checkpointFreq = checkpointId; -// pStream->checkpointId = checkpointId; -// pStream->checkpointFreq = taosGetTimestampMs(); -// atomic_store_64(&pStream->currentTick, 0); -// // 3. commit log: stream checkpoint info -// pStream->version = pStream->version + 1; -// taosWUnLockLatch(&pStream->lock); + void *buf; + int32_t tlen; + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, + pTask->id.taskId) < 0) { + mndReleaseVgroup(pMnode, pVgObj); + taosWUnLockLatch(&pStream->lock); + goto _ERR; + } -// // // code condtion + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; -// SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); -// if (pCommitRaw == NULL) { -// mError("failed to prepare trans rebalance since %s", terrstr()); -// goto _ERR; -// } -// if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { -// sdbFreeRaw(pCommitRaw); -// mError("failed to prepare trans rebalance since %s", terrstr()); -// goto _ERR; -// } -// if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { -// sdbFreeRaw(pCommitRaw); -// mError("failed to prepare trans rebalance since %s", terrstr()); -// goto _ERR; -// } + mndReleaseVgroup(pMnode, pVgObj); -// if (mndTransPrepare(pMnode, pTrans) != 0) { -// mError("failed to prepare trans rebalance since %s", terrstr()); -// goto _ERR; -// } -// mndTransDrop(pTrans); -// return 0; -// _ERR: -// mndTransDrop(pTrans); -// return -1; -// } + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(buf); + taosWUnLockLatch(&pStream->lock); + goto _ERR; + } + } + } + } + // 2. reset tick + pStream->checkpointFreq = checkpointId; + pStream->checkpointId = checkpointId; + pStream->checkpointFreq = taosGetTimestampMs(); + pStream->currentTick = 0; + // 3. commit log: stream checkpoint info + pStream->version = pStream->version + 1; + taosWUnLockLatch(&pStream->lock); + + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL) { + mError("failed to prepare trans rebalance since %s", terrstr()); + goto _ERR; + } + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + goto _ERR; + } + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + goto _ERR; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("failed to prepare trans rebalance since %s", terrstr()); + goto _ERR; + } + code = 0; +_ERR: + mndTransDrop(pTrans); + return code; +} static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, int64_t checkpointId) { @@ -1111,8 +1106,8 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream pStream->checkpointId = checkpointId; pStream->checkpointFreq = taosGetTimestampMs(); - atomic_store_64(&pStream->currentTick, 0); - // 3. commit log: stream checkpoint info + pStream->currentTick = 0; + // 3. commit log: stream checkpoint info pStream->version = pStream->version + 1; taosWUnLockLatch(&pStream->lock); @@ -1212,43 +1207,17 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, MND_STREAM_CHECKPOINT_NAME); - if (pTrans == NULL) { - mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return -1; - } - mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); - - const char *pDb = mndGetStreamDB(pMnode); - mndTransSetDbName(pTrans, pDb, "checkpoint"); - taosMemoryFree((void *)pDb); - - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, - tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); - mndTransDrop(pTrans); - return -1; - } - while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - code = mndAddStreamCheckpointToTrans(pTrans, pStream, pMnode, checkpointId); + code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId); sdbRelease(pSdb, pStream); if (code == -1) { break; } } - if (code == 0) { - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("failed to prepre trans rebalance since %s", terrstr()); - } - } - - mndTransDrop(pTrans); return code; } @@ -1324,7 +1293,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB); - //reuse this function for stream + // reuse this function for stream auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail); @@ -2018,7 +1987,7 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); - const SEp* p = GET_ACTIVE_EP(pCurrent); + const SEp *p = GET_ACTIVE_EP(pCurrent); if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { return false; @@ -2287,7 +2256,7 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -2312,8 +2281,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + int64_t k[2] = {p->streamId, p->taskId}; + int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); if (index == NULL) { continue; }