diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 19fe34fe01..7621615278 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -250,7 +250,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_CONSEN, "stream-chkpt-consen", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CONSEN_CHKPT, "stream-req-consen-chkpt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) @@ -342,6 +342,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 566e8dbbd8..4d5e18520c 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -29,7 +29,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); @@ -37,12 +38,12 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); -int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* msg); int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); -int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg); void tqSetRestoreVersionInfo(SStreamTask* pTask); int32_t tqExpandStreamTask(SStreamTask* pTask); diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index 8b6ca2c5cd..bdb8ff7f8e 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -223,16 +223,6 @@ typedef struct SRestoreCheckpointInfo { int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); -typedef struct SRestoreCheckpointInfoRsp { - int64_t streamId; - int64_t checkpointId; - int64_t startTs; - int32_t taskId; -} SRestoreCheckpointInfoRsp; - -int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo); -int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo); - typedef struct { SMsgHead head; int64_t streamId; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 093a21c999..d0feebf814 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -616,9 +616,8 @@ typedef struct SStreamTaskState { typedef struct SCheckpointConsensusInfo { SArray* pTaskList; -// int64_t checkpointId; -// int64_t genTs; int32_t numOfTasks; + int64_t streamId; } SCheckpointConsensusInfo; int32_t streamSetupScheduleTrigger(SStreamTask* pTask); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 677e19d4c1..f3763ef0c5 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -233,6 +233,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -242,7 +243,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 7a0189b7c1..3d0587a11b 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -76,6 +76,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; @@ -96,7 +97,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 001696aecc..7d35fd71b7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -972,10 +972,11 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index a86e06b486..7b4270462f 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -83,7 +83,7 @@ typedef struct SOrphanTask { typedef struct { SMsgHead head; -} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp; +} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp, SMStreamReqConsensChkptRsp; typedef struct STaskChkptInfo { int32_t nodeId; @@ -133,9 +133,8 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); -int32_t mndSendQuickConsensusChkptIdRsp(SRestoreCheckpointInfo *pReq, int32_t code, int64_t streamId, - int64_t checkpointId, SRpcHandleInfo *pRpcInfo); - +int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, + int64_t ts); void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); @@ -151,9 +150,7 @@ SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcHandleInfo *pRpcInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); -int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); -int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a1016ad96c..a8dc1e42bb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -61,6 +61,7 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg); static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); +static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -107,6 +108,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp); // for msgs inside mnode // TODO change the name @@ -119,7 +121,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_CONSEN, mndProcessConsensusCheckpointId); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, mndProcessConsensusCheckpointId); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); @@ -2611,23 +2613,14 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); - { - SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamUpdateChkptRsp)}; - rsp.pCont = rpcMallocCont(rsp.contLen); - SMsgHead *pHead = rsp.pCont; - pHead->vgId = htonl(req.nodeId); - - tmsgSendRsp(&rsp); - pReq->info.handle = NULL; // disable auto rsp - } - + doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS); return 0; } -static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, bool* pAllEqual) { +static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks) { int32_t num = 0; int64_t chkId = INT64_MAX; - *pAllEqual = true; + *pExistedTasks = 0; for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId* p = taosArrayGet(execInfo.pTaskList, i); @@ -2637,16 +2630,12 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, bool* pAllEq num += 1; STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); - - if (chkId != INT64_MAX && chkId != pe->checkpointInfo.latestId) { - *pAllEqual = false; - } - if (chkId > pe->checkpointInfo.latestId) { chkId = pe->checkpointInfo.latestId; } } + *pExistedTasks = num; if (num < numOfTasks) { // not all task send info to mnode through hbMsg, no valid checkpoint Id return -1; } @@ -2654,6 +2643,16 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, bool* pAllEq return chkId; } +static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) { + SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(vgId); + + tmsgSendRsp(&rsp); + pInfo->handle = NULL; // disable auto rsp +} + static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SDecoder decoder = {0}; @@ -2675,9 +2674,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. taosThreadMutexLock(&execInfo.lock); - SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); - // mnode handle the create stream transaction too slow may cause this problem + SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); if (pStream == NULL) { mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId); @@ -2688,11 +2686,9 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { if (p == NULL) { mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - - mndSendQuickConsensusChkptIdRsp(&req, terrno, req.streamId, 0, &pMsg->info); - taosThreadMutexUnlock(&execInfo.lock); - pMsg->info.handle = NULL; // disable auto rsp + + doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); return -1; } else { mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", @@ -2706,36 +2702,35 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly - mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, 0, &pMsg->info); - taosThreadMutexUnlock(&execInfo.lock); - pMsg->info.handle = NULL; // disable auto rsp + mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs); + + doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); return TSDB_CODE_SUCCESS; } - bool allEqual = true; - int64_t chkId = getConsensusId(req.streamId, numOfTasks, &allEqual); + int32_t num = 0; + int64_t chkId = getConsensusId(req.streamId, numOfTasks, &num); // some tasks not send hbMsg to mnode yet, wait for 5s. if (chkId == -1) { - mDebug( - "not all task send hbMsg yet, add into list and wait for 10s to check the consensus-checkpointId again, " - "s-task:0x%x", req.taskId); + mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", req.taskId, num, + numOfTasks); SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); mndAddConsensusTasks(pInfo, &req, &pMsg->info); - taosThreadMutexUnlock(&execInfo.lock); - pMsg->info.handle = NULL; // disable auto rsp + taosThreadMutexUnlock(&execInfo.lock); + doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); return 0; } if (chkId == req.checkpointId) { mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64, req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId); - mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, chkId, &pMsg->info); + mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs); taosThreadMutexUnlock(&execInfo.lock); - pMsg->info.handle = NULL; // disable auto rsp + doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); return 0; } @@ -2748,14 +2743,14 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { } taosThreadMutexUnlock(&execInfo.lock); - pMsg->info.handle = NULL; // disable auto rsp - + doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); return 0; } int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; int64_t now = taosGetTimestampMs(); - int64_t streamId = -1; // todo: fix only one + SArray *pStreamList = taosArrayInit(4, sizeof(int64_t)); mDebug("start to process consensus-checkpointId in tmr"); taosThreadMutexLock(&execInfo.lock); @@ -2764,40 +2759,42 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) { SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter; - int32_t j = 0; + int64_t streamId = -1; int32_t num = taosArrayGetSize(pInfo->pTaskList); - SArray *pList = taosArrayInit(4, sizeof(int32_t)); - for (; j < num; ++j) { - SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j); + SStreamObj *pStream = mndGetStreamObj(pMnode, pInfo->streamId); + if (pStream == NULL) { // stream has been dropped already + mDebug("stream:0x%"PRIx64" dropped already, continue", pInfo->streamId); + continue; + } - if ((now - pe->ts) > 10 * 1000) { - bool allEqual = true; - int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &allEqual); + for (int32_t j = 0; j < num; ++j) { + SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j); + streamId = pe->req.streamId; + + if ((now - pe->ts) >= 10 * 1000) { + int32_t existed = 0; + int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed); if (chkId == -1) { - mDebug("tasks send hbMsg for stream:0x%" PRIx64 ", wait for next round", pe->req.streamId); + mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed, + pInfo->numOfTasks, pe->req.taskId); break; } - if (allEqual) { - mDebug("all has identical checkpointId for stream:0x%"PRIx64" send checkpointId to s-task:0x%x", - pe->req.streamId, pe->req.taskId); - - mndSendQuickConsensusChkptIdRsp(&pe->req, TSDB_CODE_SUCCESS, pe->req.streamId, chkId, &pe->rspInfo); - } else { - ASSERT(chkId <= pe->req.checkpointId); - mndSendQuickConsensusChkptIdRsp(&pe->req, TSDB_CODE_SUCCESS, pe->req.streamId, chkId, &pe->rspInfo); - } + ASSERT(chkId <= pe->req.checkpointId); + mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs); taosArrayPush(pList, &pe->req.taskId); streamId = pe->req.streamId; } else { mDebug("s-task:0x%x sendTs:%" PRId64 " wait %2.fs already, wait for next round to check", pe->req.taskId, - (now - pe->ts)/ 1000.0, pe->ts); + (now - pe->ts) / 1000.0, pe->ts); } } + mndReleaseStream(pMnode, pStream); + if (taosArrayGetSize(pList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { int32_t *taskId = taosArrayGet(pList, i); @@ -2815,12 +2812,19 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { if (taosArrayGetSize(pInfo->pTaskList) == 0) { mndClearConsensusRspEntry(pInfo); - mndClearConsensusCheckpointId(execInfo.pStreamConsensus, streamId); + ASSERT(streamId != -1); + taosArrayPush(pStreamList, &streamId); } } + for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) { + int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i); + mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId); + } + taosThreadMutexUnlock(&execInfo.lock); + taosArrayDestroy(pStreamList); mDebug("end to process consensus-checkpointId in tmr"); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 9ee820925c..7d9b9e4571 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -651,6 +651,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) { void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { taosThreadMutexLock(&pExecNode->lock); + // 1. remove task entries SStreamTaskIter *pIter = createStreamTaskIter(pStream); while (streamTaskIterNextTask(pIter)) { SStreamTask *pTask = streamTaskIterGetCurrent(pIter); @@ -660,8 +661,11 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); - taosThreadMutexUnlock(&pExecNode->lock); + // 2. remove stream entry in consensus hash table + mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); + + taosThreadMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter); } @@ -835,45 +839,98 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } -int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) { +static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) { + SRestoreCheckpointInfo req = { + .taskId = pTask->id.taskId, + .streamId = pTask->id.streamId, + .checkpointId = checkpointId, + .startTs = ts, + .nodeId = pTask->info.nodeId, + }; + int32_t code = 0; int32_t blen; - - SRestoreCheckpointInfoRsp req = { - .streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId, .startTs = pInfo->startTs}; - - tEncodeSize(tEncodeRestoreCheckpointInfoRsp, &req, blen, code); + tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code); if (code < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - int32_t tlen = sizeof(SMsgHead) + blen; - void *abuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t tlen = sizeof(SMsgHead) + blen; + + void *pBuf = taosMemoryMalloc(tlen); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); - tEncodeRestoreCheckpointInfoRsp(&encoder, &req); + tEncodeRestoreCheckpointInfo(&encoder, &req); - SMsgHead *pMsgHead = (SMsgHead *)pMsg->pCont; + SMsgHead *pMsgHead = (SMsgHead *)pBuf; pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(pInfo->nodeId); + pMsgHead->vgId = htonl(pTask->info.nodeId); + tEncoderClear(&encoder); - tmsgSendRsp(pMsg); + SEpSet epset = {0}; + bool hasEpset = false; + code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + taosMemoryFree(pBuf); + return code; + } + + code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pBuf); + } + return code; } -int32_t mndSendQuickConsensusChkptIdRsp(SRestoreCheckpointInfo *pReq, int32_t code, int64_t streamId, - int64_t checkpointId, SRpcHandleInfo *pRpcInfo) { - SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead)}; - rsp.pCont = rpcMallocCont(rsp.contLen); +int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, + int64_t ts) { + char msg[128] = {0}; + snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId); - SMsgHead *pHead = rsp.pCont; - pHead->vgId = htonl(pReq->nodeId); + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg); + if (pTrans == NULL) { + return terrno; + } - mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, s-task:0x%x send to vnode", - streamId, checkpointId, pReq->taskId); - return doSendConsensusCheckpointRsp(pReq, &rsp, checkpointId); + STaskId id = {.streamId = pStream->uid, .taskId = taskId}; + SStreamTask *pTask = mndGetStreamTask(&id, pStream); + ASSERT(pTask); + + /*int32_t code = */ mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid); + int32_t code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts); + if (code != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; } SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks) { @@ -885,6 +942,7 @@ SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, SCheckpointConsensusInfo p = { .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)), .numOfTasks = numOfTasks, + .streamId = streamId, }; taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 69a7bc7ba4..60e57e8a2f 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -128,6 +128,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg); + case TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP: + return tqStreamProcessConsensusChkptRsp2(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); @@ -149,15 +151,15 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { case TDMT_VND_STREAM_TASK_UPDATE: return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); case TDMT_VND_STREAM_TASK_RESET: - return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg); + return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_PAUSE: return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_RESUME: return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); case TDMT_STREAM_TASK_UPDATE_CHKPT: - return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen); - case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: - return tqStreamProcessConsensusChkptRsp(pSnode->pMeta, pMsg); + return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont); + case TDMT_STREAM_CONSEN_CHKPT: + return tqStreamTaskProcessConsenChkptIdReq(pSnode->pMeta, pMsg); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1bec226489..4a47e08730 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -298,6 +298,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0a64b9c165..ac57a003c5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1016,7 +1016,11 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { - return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg, msgLen); + return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg); +} + +int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamTaskProcessConsenChkptIdReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { @@ -1239,7 +1243,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg); + return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont); } int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { @@ -1277,5 +1281,5 @@ int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamProcessConsensusChkptRsp(pTq->pStreamMeta, pMsg); + return tqStreamProcessConsensusChkptRsp2(pTq->pStreamMeta, pMsg); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c40332ff39..cb480d09bb 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -670,7 +670,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen return 0; } -int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen) { +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) { SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; int32_t vgId = pMeta->vgId; @@ -858,8 +858,8 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { return TSDB_CODE_SUCCESS; } -int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont; +int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { + SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { @@ -1115,6 +1115,8 @@ int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { ret int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } +int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } + int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont; @@ -1130,22 +1132,21 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } -int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { +int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; + int32_t code = 0; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; int64_t now = taosGetTimestampMs(); - SRestoreCheckpointInfoRsp req = {0}; + SRestoreCheckpointInfo req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); - rsp.info.handle = NULL; - if (tDecodeRestoreCheckpointInfoRsp(&decoder, &req) < 0) { - // rsp.code = TSDB_CODE_MSG_DECODE_ERROR; // disable it temporarily - tqError("vgId:%d failed to decode restore task checkpointId, code:%s", vgId, tstrerror(rsp.code)); + if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) { + tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code)); tDecoderClear(&decoder); return TSDB_CODE_SUCCESS; } @@ -1154,7 +1155,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { - tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already", + tqError("vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.taskId); streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return TSDB_CODE_SUCCESS; @@ -1182,7 +1183,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { pTask->chkInfo.checkpointId = req.checkpointId; tqSetRestoreVersionInfo(pTask); } else { - tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, no need to update", + tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, not update", pTask->id.idStr, vgId, req.checkpointId); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 3757cd00bc..04839c3357 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -630,6 +630,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } } break; + case TDMT_STREAM_CONSEN_CHKPT: { + if (pVnode->restored) { + tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg); + } + } break; case TDMT_STREAM_TASK_PAUSE: { if (pVnode->restored && vnodeIsLeader(pVnode) && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { @@ -647,11 +652,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg tqProcessTaskResetReq(pVnode->pTq, pMsg); } } break; - case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: { - if (pVnode->restored) { - tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg); - } - } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { @@ -861,6 +861,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); case TDMT_MND_STREAM_CHKPT_REPORT_RSP: return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP: + return tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d5f7d6ef21..5cd084e6a2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1153,7 +1153,7 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { tEncoderClear(&encoder); SRpcMsg msg = {0}; - initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_CONSEN, buf, tlen); + initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, buf, tlen); stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId, pInfo->checkpointId); diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index e0435156e2..f02f661143 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -650,23 +650,3 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* tEndDecode(pDecoder); return 0; } - -int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo) { - if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI64(pCoder, pInfo->startTs) < 0) return -1; - if (tEncodeI64(pCoder, pInfo->streamId) < 0) return -1; - if (tEncodeI32(pCoder, pInfo->taskId) < 0) return -1; - if (tEncodeI64(pCoder, pInfo->checkpointId) < 0) return -1; - tEndEncode(pCoder); - return 0; -} - -int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo) { - if (tStartDecode(pCoder) < 0) return -1; - if (tDecodeI64(pCoder, &pInfo->startTs) < 0) return -1; - if (tDecodeI64(pCoder, &pInfo->streamId) < 0) return -1; - if (tDecodeI32(pCoder, &pInfo->taskId) < 0) return -1; - if (tDecodeI64(pCoder, &pInfo->checkpointId) < 0) return -1; - tEndDecode(pCoder); - return 0; -} \ No newline at end of file