From 1f1f37312d6b0bb100cf84e3a0af38660695e734 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 7 Jun 2023 14:32:48 +0000 Subject: [PATCH] add trigger checkpoint --- include/common/tmsgdef.h | 5 +++-- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndMain.c | 8 ++------ source/dnode/mnode/impl/src/mndStream.c | 18 +++++++----------- source/dnode/vnode/src/inc/vnodeInt.h | 19 ++++++++++--------- source/dnode/vnode/src/tq/tq.c | 12 +++++++++++- 6 files changed, 34 insertions(+), 30 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1f2d597496..6d849e164c 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -174,13 +174,14 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) - // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) - // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a79351d5cc..007910754c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -200,7 +200,7 @@ int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 1; +int32_t tsStreamCheckpointTickInterval = 50; int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 3600; int32_t tsGrantHBInterval = 60; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 381b1e64ed..da4095d7ff 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -124,12 +124,11 @@ static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { - SRpcMsg rpcMsg = { .msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen }; + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } } -#if 0 static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { int32_t contLen = 0; void *pReq = mndBuildCheckpointTickMsg(&contLen, sec); @@ -142,7 +141,6 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } } -#endif static void mndPullupTelem(SMnode *pMnode) { mTrace("pullup telem msg"); @@ -266,11 +264,9 @@ static void *mndThreadFp(void *param) { mndCalMqRebalance(pMnode); } -#if 0 if (sec % tsStreamCheckpointTickInterval == 0) { mndStreamCheckpointTick(pMnode, sec); } -#endif if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { mndPullupTelem(pMnode); @@ -586,7 +582,7 @@ int32_t mndIsCatchUp(SMnode *pMnode) { return syncIsCatchUp(rid); } -ESyncRole mndGetRole(SMnode *pMnode){ +ESyncRole mndGetRole(SMnode *pMnode) { int64_t rid = pMnode->syncMgmt.sync; return syncGetRole(rid); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 39a1fa223f..9ecc1ad611 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -69,9 +69,9 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); - - // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); - // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); + mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); @@ -794,8 +794,6 @@ _OVER: return code; } -#if 0 - static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -836,8 +834,8 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con req.checkpointId = pMsg->checkpointId; req.nodeId = pTask->nodeId; req.expireTime = -1; - req.streamId = pTask->streamId; - req.taskId = pTask->taskId; + req.streamId = pTask->id.streamId; + req.taskId = pTask->id.taskId; int32_t code; int32_t blen; @@ -882,7 +880,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = mndAcquireStream(pMnode, pMsg->streamName); if (pStream == NULL || pStream->uid != pMsg->streamId) { - mError("start checkpointing failed since stream %s not found", pMsg->streamName); + mError("failed to checkpoint since stream %s not found", pMsg->streamName); return -1; } @@ -891,6 +889,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { if (pTrans == NULL) return -1; mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + mError("failed to checkpoint since stream %s", tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); return -1; @@ -960,8 +959,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { return 0; } -#endif - static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; @@ -1403,7 +1400,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } - static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2811fc35b0..e035781460 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -208,15 +208,16 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); // tq -int tqInit(); -void tqCleanUp(); -STQ* tqOpen(const char* path, SVnode* pVnode); -void tqNotifyClose(STQ*); -void tqClose(STQ*); -int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); -int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); -int tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqInit(); +void tqCleanUp(); +STQ* tqOpen(const char* path, SVnode* pVnode); +void tqNotifyClose(STQ*); +void tqClose(STQ*); +int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); +int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); +int tqUnregisterPushHandle(STQ* pTq, void* pHandle); +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aa6bbbe9df..8e696e5484 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1120,7 +1120,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion); + qDebug("s-task:%s set start wal scan start ver:%" PRId64, pTask->id.idStr, sversion); walReaderSeekVer(pTask->exec.pWalReader, sversion); pTask->chkInfo.currentVer = sversion; @@ -1506,3 +1506,13 @@ int32_t tqStartStreamTasks(STQ* pTq) { return 0; } +int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); + int32_t len = msgLen - sizeof(SMsgHead); + + taosWLockLatch(&pMeta->lock); + taosWUnLockLatch(&pMeta->lock); + return 0; +}