add trigger checkpoint

This commit is contained in:
yihaoDeng 2023-06-07 14:32:48 +00:00
parent 79b628101c
commit 1f1f37312d
6 changed files with 34 additions and 30 deletions

View File

@ -174,13 +174,14 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
// TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
// TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)

View File

@ -200,7 +200,7 @@ int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
// internal // internal
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 1; int32_t tsStreamCheckpointTickInterval = 50;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 3600; int32_t tsTtlPushInterval = 3600;
int32_t tsGrantHBInterval = 60; int32_t tsGrantHBInterval = 60;

View File

@ -124,12 +124,11 @@ static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = { .msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen }; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
} }
#if 0
static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, sec); void *pReq = mndBuildCheckpointTickMsg(&contLen, sec);
@ -142,7 +141,6 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
} }
#endif
static void mndPullupTelem(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) {
mTrace("pullup telem msg"); mTrace("pullup telem msg");
@ -266,11 +264,9 @@ static void *mndThreadFp(void *param) {
mndCalMqRebalance(pMnode); mndCalMqRebalance(pMnode);
} }
#if 0
if (sec % tsStreamCheckpointTickInterval == 0) { if (sec % tsStreamCheckpointTickInterval == 0) {
mndStreamCheckpointTick(pMnode, sec); mndStreamCheckpointTick(pMnode, sec);
} }
#endif
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode); mndPullupTelem(pMnode);
@ -586,7 +582,7 @@ int32_t mndIsCatchUp(SMnode *pMnode) {
return syncIsCatchUp(rid); return syncIsCatchUp(rid);
} }
ESyncRole mndGetRole(SMnode *pMnode){ ESyncRole mndGetRole(SMnode *pMnode) {
int64_t rid = pMnode->syncMgmt.sync; int64_t rid = pMnode->syncMgmt.sync;
return syncGetRole(rid); return syncGetRole(rid);
} }

View File

@ -69,9 +69,9 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
@ -794,8 +794,6 @@ _OVER:
return code; return code;
} }
#if 0
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
@ -836,8 +834,8 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
req.checkpointId = pMsg->checkpointId; req.checkpointId = pMsg->checkpointId;
req.nodeId = pTask->nodeId; req.nodeId = pTask->nodeId;
req.expireTime = -1; req.expireTime = -1;
req.streamId = pTask->streamId; req.streamId = pTask->id.streamId;
req.taskId = pTask->taskId; req.taskId = pTask->id.taskId;
int32_t code; int32_t code;
int32_t blen; int32_t blen;
@ -882,7 +880,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = mndAcquireStream(pMnode, pMsg->streamName); SStreamObj *pStream = mndAcquireStream(pMnode, pMsg->streamName);
if (pStream == NULL || pStream->uid != pMsg->streamId) { if (pStream == NULL || pStream->uid != pMsg->streamId) {
mError("start checkpointing failed since stream %s not found", pMsg->streamName); mError("failed to checkpoint since stream %s not found", pMsg->streamName);
return -1; return -1;
} }
@ -891,6 +889,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mError("failed to checkpoint since stream %s", tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
@ -960,8 +959,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
return 0; return 0;
} }
#endif
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
@ -1403,7 +1400,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {

View File

@ -208,15 +208,16 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
// tq // tq
int tqInit(); int tqInit();
void tqCleanUp(); void tqCleanUp();
STQ* tqOpen(const char* path, SVnode* pVnode); STQ* tqOpen(const char* path, SVnode* pVnode);
void tqNotifyClose(STQ*); void tqNotifyClose(STQ*);
void tqClose(STQ*); void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -1120,7 +1120,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
return -1; return -1;
} }
qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion); qDebug("s-task:%s set start wal scan start ver:%" PRId64, pTask->id.idStr, sversion);
walReaderSeekVer(pTask->exec.pWalReader, sversion); walReaderSeekVer(pTask->exec.pWalReader, sversion);
pTask->chkInfo.currentVer = sversion; pTask->chkInfo.currentVer = sversion;
@ -1506,3 +1506,13 @@ int32_t tqStartStreamTasks(STQ* pTq) {
return 0; return 0;
} }
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead);
taosWLockLatch(&pMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return 0;
}