diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index c92649f1f7..3515df3127 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -250,7 +250,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_CONSEN, "stream-chkpt-consen", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) @@ -341,6 +341,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 566e8dbbd8..4d5e18520c 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -29,7 +29,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); @@ -37,12 +38,12 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); -int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* msg); int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); -int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg); void tqSetRestoreVersionInfo(SStreamTask* pTask); int32_t tqExpandStreamTask(SStreamTask* pTask); diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index b69032330d..34921daac3 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -216,6 +216,7 @@ typedef struct SRestoreCheckpointInfo { int64_t startTs; int64_t streamId; int64_t checkpointId; // latest checkpoint id + int32_t transId; // transaction id of the update the consensus-checkpointId transaction int32_t taskId; int32_t nodeId; } SRestoreCheckpointInfo; @@ -223,16 +224,6 @@ typedef struct SRestoreCheckpointInfo { int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); -typedef struct SRestoreCheckpointInfoRsp { - int64_t streamId; - int64_t checkpointId; - int64_t startTs; - int32_t taskId; -} SRestoreCheckpointInfoRsp; - -int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo); -int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo); - typedef struct { SMsgHead head; int64_t streamId; @@ -242,8 +233,7 @@ typedef struct { typedef struct SCheckpointConsensusEntry { SRestoreCheckpointInfo req; - SRpcMsg rsp; - int64_t ts; + int64_t ts; } SCheckpointConsensusEntry; #ifdef __cplusplus diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e98039d2fe..5ba0ce454c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -272,9 +272,9 @@ typedef struct SCheckpointInfo { int64_t checkpointTime; // latest checkpoint time int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it - + int64_t msgVer; + int32_t consensusTransId;// consensus checkpoint id SActiveCheckpointInfo* pActiveInfo; - int64_t msgVer; } SCheckpointInfo; typedef struct SStreamStatus { @@ -289,6 +289,8 @@ typedef struct SStreamStatus { int32_t inScanHistorySentinel; bool appendTranstateBlock; // has append the transfer state data block already bool removeBackendFiles; // remove backend files on disk when free stream tasks + bool sendConsensusChkptId; + bool requireConsensusChkptId; } SStreamStatus; typedef struct SDataRange { @@ -528,10 +530,11 @@ typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5); +void tFreeStreamTask(SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); -void tFreeStreamTask(SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); +void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); @@ -568,14 +571,16 @@ typedef struct { } SStreamScanHistoryReq; typedef struct STaskCkptInfo { - int64_t latestId; // saved checkpoint id - int64_t latestVer; // saved checkpoint ver - int64_t latestTime; // latest checkpoint time - int64_t latestSize; // latest checkpoint size - int8_t remoteBackup; // latest checkpoint backup done - int64_t activeId; // current active checkpoint id - int32_t activeTransId; // checkpoint trans id - int8_t failed; // denote if the checkpoint is failed or not + int64_t latestId; // saved checkpoint id + int64_t latestVer; // saved checkpoint ver + int64_t latestTime; // latest checkpoint time + int64_t latestSize; // latest checkpoint size + int8_t remoteBackup; // latest checkpoint backup done + int64_t activeId; // current active checkpoint id + int32_t activeTransId; // checkpoint trans id + int8_t failed; // denote if the checkpoint is failed or not + int8_t consensusChkptId; // required the consensus-checkpointId + int64_t consensusTs; // } STaskCkptInfo; typedef struct STaskStatusEntry { @@ -586,8 +591,6 @@ typedef struct STaskStatusEntry { int32_t nodeId; SVersionRange verRange; // start/end version in WAL, only valid for source task int64_t processedVer; // only valid for source task - bool inputQChanging; // inputQ is changing or not - int64_t inputQUnchangeCounter; double inputQUsed; // in MiB double inputRate; double procsThroughput; // duration between one element put into input queue and being processed. @@ -616,8 +619,8 @@ typedef struct SStreamTaskState { typedef struct SCheckpointConsensusInfo { SArray* pTaskList; - int64_t checkpointId; - int64_t genTs; + int32_t numOfTasks; + int64_t streamId; } SCheckpointConsensusInfo; int32_t streamSetupScheduleTrigger(SStreamTask* pTask); @@ -800,6 +803,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp); int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp); +int32_t streamTaskSendPreparedCheckpointsourceRsp(SStreamTask* pTask); #ifdef __cplusplus diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 677e19d4c1..9b987b3237 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -233,6 +233,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -242,7 +243,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 7a0189b7c1..5c2f54fd10 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -76,6 +76,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; @@ -96,7 +97,6 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 001696aecc..fbe1925e3f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -972,10 +972,10 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b261f89057..0b6b6a9ef2 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -83,7 +83,7 @@ typedef struct SOrphanTask { typedef struct { SMsgHead head; -} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp; +} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp, SMStreamReqConsensChkptRsp; typedef struct STaskChkptInfo { int32_t nodeId; @@ -133,8 +133,8 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); -int32_t mndSendConsensusCheckpointIdRsp(SArray* pList, int64_t checkpointId); - +int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, + int64_t ts); void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); @@ -146,14 +146,10 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInf int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId); -void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg); -int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo *pInfo, SStreamObj *pStream); -bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numOfTasks, int32_t* pTotal); +SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks); +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); -int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); -int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 398ea5d589..723e3701a1 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -177,6 +177,15 @@ static void mndStreamCheckNode(SMnode *pMnode) { } } +static void mndStreamConsensusChkpt(SMnode *pMnode) { + int32_t contLen = 0; + void *pReq = mndBuildTimerMsg(&contLen); + if (pReq != NULL) { + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CONSEN_TIMER, .pCont = pReq, .contLen = contLen}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } +} + static void mndPullupTelem(SMnode *pMnode) { mTrace("pullup telem msg"); int32_t contLen = 0; @@ -308,7 +317,6 @@ static int32_t minCronTime() { min = TMIN(min, tsCompactPullupInterval); min = TMIN(min, tsMqRebalanceInterval); min = TMIN(min, tsStreamCheckpointInterval); - min = TMIN(min, 6); // checkpointRemain min = TMIN(min, tsStreamNodeCheckInterval); min = TMIN(min, tsArbHeartBeatIntervalSec); min = TMIN(min, tsArbCheckSyncIntervalSec); @@ -353,6 +361,10 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { mndStreamCheckNode(pMnode); } + if (sec % 5 == 0) { + mndStreamConsensusChkpt(pMnode); + } + if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { mndPullupTelem(pMnode); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 20cd415a6f..415d1ff9f0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -59,7 +59,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); -static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq); +//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg); +static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); +static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -106,6 +108,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp); // for msgs inside mnode // TODO change the name @@ -118,11 +121,11 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_CONSEN, mndProcessConsensusCheckpointId); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq); @@ -803,7 +806,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name); saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo); - mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid); +// mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid); taosThreadMutexUnlock(&execInfo.lock); // execute creation @@ -1268,9 +1271,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); sdbRelease(pSdb, p); - // clear the consensus checkpoint info - mndClearConsensusCheckpointId(execInfo.pStreamConsensus, p->uid); - if (code != -1) { started += 1; @@ -2340,7 +2340,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { taosThreadMutexUnlock(&execInfo.lock); if (numOfNodes == 0) { - mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); + mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing"); execInfo.ts = ts; atomic_store_32(&mndNodeCheckSentinel, 0); return 0; @@ -2643,113 +2643,238 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); - { - SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamUpdateChkptRsp)}; - rsp.pCont = rpcMallocCont(rsp.contLen); - SMsgHead *pHead = rsp.pCont; - pHead->vgId = htonl(req.nodeId); - - tmsgSendRsp(&rsp); - pReq->info.handle = NULL; // disable auto rsp - } - + doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS); return 0; } -static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SDecoder decoder = {0}; +static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks, bool *pAllSame) { + int32_t num = 0; + int64_t chkId = INT64_MAX; + *pExistedTasks = 0; + *pAllSame = true; - SRestoreCheckpointInfo req = {0}; - tDecoderInit(&decoder, pReq->pCont, pReq->contLen); + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { + STaskId* p = taosArrayGet(execInfo.pTaskList, i); + if (p->streamId != streamId) { + continue; + } - if (tDecodeRestoreCheckpointInfo(&decoder, &req)) { - tDecoderClear(&decoder); - terrno = TSDB_CODE_INVALID_MSG; - mError("invalid task consensus-checkpoint msg received"); + num += 1; + STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + if (chkId > pe->checkpointInfo.latestId) { + if (chkId != INT64_MAX) { + *pAllSame = false; + } + chkId = pe->checkpointInfo.latestId; + } + } + + *pExistedTasks = num; + if (num < numOfTasks) { // not all task send info to mnode through hbMsg, no valid checkpoint Id return -1; } - tDecoderClear(&decoder); - mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64, - req.nodeId, req.streamId, req.taskId, req.checkpointId); + return chkId; +} + +static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) { + SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(vgId); + + tmsgSendRsp(&rsp); + pInfo->handle = NULL; // disable auto rsp +} + +//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { +// SMnode *pMnode = pMsg->info.node; +// SDecoder decoder = {0}; +// +// SRestoreCheckpointInfo req = {0}; +// tDecoderInit(&decoder, pMsg->pCont, pMsg->contLen); +// +// if (tDecodeRestoreCheckpointInfo(&decoder, &req)) { +// tDecoderClear(&decoder); +// terrno = TSDB_CODE_INVALID_MSG; +// mError("invalid task consensus-checkpoint msg received"); +// return -1; +// } +// tDecoderClear(&decoder); +// +// mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64, +// req.nodeId, req.streamId, req.taskId, req.checkpointId); +// +// // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. +// taosThreadMutexLock(&execInfo.lock); +// +// // mnode handle the create stream transaction too slow may cause this problem +// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); +// if (pStream == NULL) { +// mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId); +// +// // not in meta-store yet, try to acquire the task in exec buffer +// // the checkpoint req arrives too soon before the completion of the create stream trans. +// STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; +// void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); +// if (p == NULL) { +// mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId); +// terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; +// taosThreadMutexUnlock(&execInfo.lock); +// +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return -1; +// } else { +// mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", +// req.streamId, req.taskId); +// // todo wait for stream is created +// } +// } +// +// mInfo("vgId:%d stream:0x%" PRIx64 " %s meta-stored checkpointId:%" PRId64, req.nodeId, req.streamId, pStream->name, +// pStream->checkpointId); +// +// int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); +// if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly +// taosThreadMutexUnlock(&execInfo.lock); +// mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs); +// +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return TSDB_CODE_SUCCESS; +// } +// +// int32_t num = 0; +// int64_t chkId = getConsensusId(req.streamId, numOfTasks, &num); +// +// // some tasks not send hbMsg to mnode yet, wait for 5s. +// if (chkId == -1) { +// mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", req.taskId, num, +// numOfTasks); +// SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); +// mndAddConsensusTasks(pInfo, &req); +// +// taosThreadMutexUnlock(&execInfo.lock); +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return 0; +// } +// +// if (chkId == req.checkpointId) { +// mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64, +// req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId); +// mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs); +// +// taosThreadMutexUnlock(&execInfo.lock); +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return 0; +// } +// +// // wait for 5s and check again +// SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); +// mndAddConsensusTasks(pInfo, &req); +// +// if (pStream != NULL) { +// mndReleaseStream(pMnode, pStream); +// } +// +// taosThreadMutexUnlock(&execInfo.lock); +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return 0; +//} + +int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; + int64_t now = taosGetTimestampMs(); + SArray *pStreamList = taosArrayInit(4, sizeof(int64_t)); + + mDebug("start to process consensus-checkpointId in tmr"); + + bool allReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + taosArrayDestroy(pNodeSnapshot); + if (!allReady) { + mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process"); + taosArrayDestroy(pStreamList); + return 0; + } - // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. taosThreadMutexLock(&execInfo.lock); - SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); - if (pStream == NULL) { - mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); + void *pIter = NULL; + while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) { + SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter; - // not in meta-store yet, try to acquire the task in exec buffer - // the checkpoint req arrives too soon before the completion of the create stream trans. - STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; - void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); - if (p == NULL) { - mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - taosThreadMutexUnlock(&execInfo.lock); - return -1; - } else { - mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", - req.streamId, req.taskId); + int64_t streamId = -1; + int32_t num = taosArrayGetSize(pInfo->pTaskList); + SArray *pList = taosArrayInit(4, sizeof(int32_t)); + + SStreamObj *pStream = mndGetStreamObj(pMnode, pInfo->streamId); + if (pStream == NULL) { // stream has been dropped already + mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId); + taosArrayDestroy(pList); + continue; } - } - int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); + for (int32_t j = 0; j < num; ++j) { + SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j); + streamId = pe->req.streamId; - SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId); + int32_t existed = 0; + bool allSame = true; + int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame); + if (chkId == -1) { + mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed, + pInfo->numOfTasks, pe->req.taskId); + break; + } - int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream); - if (ckId != -1) { // consensus checkpoint id already exist - SRpcMsg rsp = {0}; - rsp.code = 0; - rsp.info = pReq->info; - rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); - rsp.pCont = rpcMallocCont(rsp.contLen); + if (((now - pe->ts) >= 10 * 1000) || allSame) { + mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId, + pe->req.startTs, (now - pe->ts) / 1000.0); + ASSERT(chkId <= pe->req.checkpointId); + mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs); - SMsgHead *pHead = rsp.pCont; - pHead->vgId = htonl(req.nodeId); - - mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId); - doSendConsensusCheckpointRsp(&req, &rsp, ckId); - - taosThreadMutexUnlock(&execInfo.lock); - pReq->info.handle = NULL; // disable auto rsp - - return TSDB_CODE_SUCCESS; - } - - mndAddConsensusTasks(pInfo, &req, pReq); - - int32_t total = 0; - if (mndAllTaskSendCheckpointId(pInfo, numOfTasks, &total)) { // all tasks has send the reqs - // start transaction to set the checkpoint id - int64_t checkpointId = mndGetConsensusCheckpointId(pInfo, pStream); - mInfo("stream:0x%" PRIx64 " %s all %d tasks send latest checkpointId, the consensus-checkpointId is:%" PRId64 - " will be issued soon", - req.streamId, pStream->name, numOfTasks, checkpointId); - - // start the checkpoint consensus trans - int32_t code = mndSendConsensusCheckpointIdRsp(pInfo->pTaskList, checkpointId); - if (code == TSDB_CODE_SUCCESS) { - mndClearConsensusRspEntry(pInfo); - mDebug("clear all waiting for rsp entry for stream:0x%" PRIx64, req.streamId); - } else { - mDebug("stream:0x%" PRIx64 " not start send consensus-checkpointId msg, due to not all task ready", req.streamId); + taosArrayPush(pList, &pe->req.taskId); + streamId = pe->req.streamId; + } else { + mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId, + pe->req.startTs, (now - pe->ts) / 1000.0); + } } - } else { - mDebug("stream:0x%" PRIx64 " %d/%d tasks send consensus-checkpointId info", req.streamId, total, numOfTasks); - } - if (pStream != NULL) { mndReleaseStream(pMnode, pStream); + + if (taosArrayGetSize(pList) > 0) { + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + int32_t *taskId = taosArrayGet(pList, i); + for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) { + SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k); + if (pe->req.taskId == *taskId) { + taosArrayRemove(pInfo->pTaskList, k); + break; + } + } + } + } + + taosArrayDestroy(pList); + + if (taosArrayGetSize(pInfo->pTaskList) == 0) { + mndClearConsensusRspEntry(pInfo); + ASSERT(streamId != -1); + taosArrayPush(pStreamList, &streamId); + } + } + + for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) { + int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i); + mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId); } taosThreadMutexUnlock(&execInfo.lock); - pReq->info.handle = NULL; // disable auto rsp - return 0; + taosArrayDestroy(pStreamList); + mDebug("end to process consensus-checkpointId in tmr"); + return TSDB_CODE_SUCCESS; } static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index c7f97b4a62..1452ac77d2 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -246,7 +246,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); + mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); @@ -284,6 +284,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { continue; } + STaskCkptInfo *pChkInfo = &p->checkpointInfo; + if (pChkInfo->consensusChkptId != 0) { + SRestoreCheckpointInfo cp = { + .streamId = p->id.streamId, + .taskId = p->id.taskId, + .checkpointId = p->checkpointInfo.latestId, + .startTs = pChkInfo->consensusTs, + }; + + SStreamObj *pStream = mndGetStreamObj(pMnode, p->id.streamId); + int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + + SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks); + mndAddConsensusTasks(pInfo, &cp); + mndReleaseStream(pMnode, pStream); + } + if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); if (pTaskEntry->nodeId == SNODE_HANDLE) { @@ -292,7 +309,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } else { streamTaskStatusCopy(pTaskEntry, p); - STaskCkptInfo *pChkInfo = &p->checkpointInfo; if ((pChkInfo->activeId != 0) && pChkInfo->failed) { mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, pChkInfo->activeId, pChkInfo->activeTransId); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 8fb5bc8a99..c4adbd0fc3 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -84,9 +84,10 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; + int32_t replica = -1; // do the replica check *allReady = true; - SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); + SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -97,6 +98,17 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime}; entry.epset = mndGetVgroupEpset(pMnode, pVgroup); + if (replica == -1) { + replica = pVgroup->replica; + } else { + if (replica != pVgroup->replica) { + mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations", + pVgroup->vgId, pVgroup->replica, replica); + *allReady = false; + break; + } + } + // if not all ready till now, no need to check the remaining vgroups. if (*allReady) { for (int32_t i = 0; i < pVgroup->replica; ++i) { @@ -107,8 +119,10 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { } ESyncState state = pVgroup->vnodeGid[i].syncState; - if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) { - mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId); + if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER || + state == TAOS_SYNC_STATE_CANDIDATE) { + mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", + pVgroup->vgId, state); *allReady = false; break; } @@ -119,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); - taosArrayPush(pVgroupListSnapshot, &entry); + taosArrayPush(pVgroupList, &entry); sdbRelease(pSdb, pVgroup); } @@ -138,11 +152,11 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); - taosArrayPush(pVgroupListSnapshot, &entry); + taosArrayPush(pVgroupList, &entry); sdbRelease(pSdb, pObj); } - return pVgroupListSnapshot; + return pVgroupList; } SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { @@ -637,6 +651,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) { void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { taosThreadMutexLock(&pExecNode->lock); + // 1. remove task entries SStreamTaskIter *pIter = createStreamTaskIter(pStream); while (streamTaskIterNextTask(pIter)) { SStreamTask *pTask = streamTaskIterGetCurrent(pIter); @@ -646,8 +661,11 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); - taosThreadMutexUnlock(&pExecNode->lock); + // 2. remove stream entry in consensus hash table + mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); + + taosThreadMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter); } @@ -821,50 +839,113 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } -int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) { +static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) { + SRestoreCheckpointInfo req = { + .taskId = pTask->id.taskId, + .streamId = pTask->id.streamId, + .checkpointId = checkpointId, + .startTs = ts, + .nodeId = pTask->info.nodeId, + .transId = pTrans->id, + }; + int32_t code = 0; int32_t blen; - - SRestoreCheckpointInfoRsp req = { - .streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId, .startTs = pInfo->startTs}; - - tEncodeSize(tEncodeRestoreCheckpointInfoRsp, &req, blen, code); + tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code); if (code < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } int32_t tlen = sizeof(SMsgHead) + blen; - void *abuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + + void *pBuf = taosMemoryMalloc(tlen); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); - tEncodeRestoreCheckpointInfoRsp(&encoder, &req); + tEncodeRestoreCheckpointInfo(&encoder, &req); - SMsgHead *pMsgHead = (SMsgHead *)pMsg->pCont; + SMsgHead *pMsgHead = (SMsgHead *)pBuf; pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(pInfo->nodeId); + pMsgHead->vgId = htonl(pTask->info.nodeId); + tEncoderClear(&encoder); - tmsgSendRsp(pMsg); + SEpSet epset = {0}; + bool hasEpset = false; + code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + taosMemoryFree(pBuf); + return code; + } + + code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pBuf); + } + return code; } -int32_t mndSendConsensusCheckpointIdRsp(SArray* pInfoList, int64_t checkpointId) { - for(int32_t i = 0; i < taosArrayGetSize(pInfoList); ++i) { - SCheckpointConsensusEntry* pInfo = taosArrayGet(pInfoList, i); - doSendConsensusCheckpointRsp(&pInfo->req, &pInfo->rsp, checkpointId); +int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, + int64_t ts) { + char msg[128] = {0}; + snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId); + + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg); + if (pTrans == NULL) { + return terrno; } - return 0; + + STaskId id = {.streamId = pStream->uid, .taskId = taskId}; + SStreamTask *pTask = mndGetStreamTask(&id, pStream); + ASSERT(pTask); + + /*int32_t code = */ mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid); + int32_t code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts); + if (code != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; } -SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId) { +SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks) { void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); if (pInfo != NULL) { return (SCheckpointConsensusInfo*)pInfo; } SCheckpointConsensusInfo p = { - .genTs = -1, .checkpointId = -1, .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry))}; + .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)), + .numOfTasks = numOfTasks, + .streamId = streamId, + }; + taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); void* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); @@ -873,87 +954,27 @@ SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId) // no matter existed or not, add the request into info list anyway, since we need to send rsp mannually // discard the msg may lead to the lost of connections. -void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg) { - SCheckpointConsensusEntry info = {0}; +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) { + SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()}; memcpy(&info.req, pRestoreInfo, sizeof(info.req)); - info.rsp.code = 0; - info.rsp.info = pMsg->info; - info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); - info.rsp.pCont = rpcMallocCont(info.rsp.contLen); - - SMsgHead *pHead = info.rsp.pCont; - pHead->vgId = htonl(pRestoreInfo->nodeId); + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { + SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i); + if (p->req.taskId == info.req.taskId) { + mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64 + "->%" PRId64 " total existed:%d", + pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, + (int32_t)taosArrayGetSize(pInfo->pTaskList)); + p->req.startTs = info.req.startTs; + return; + } + } taosArrayPush(pInfo->pTaskList, &info); -} - -static int32_t entryComparFn(const void* p1, const void* p2) { - const SCheckpointConsensusEntry* pe1 = p1; - const SCheckpointConsensusEntry* pe2 = p2; - - if (pe1->req.taskId == pe2->req.taskId) { - return 0; - } - - return pe1->req.taskId < pe2->req.taskId? -1:1; -} - -bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTasks, int32_t* pTotal) { - int32_t numOfExisted = taosArrayGetSize(pInfo->pTaskList); - if (numOfExisted < numOfTasks) { - if (pTotal != NULL) { - *pTotal = numOfExisted; - } - return false; - } - - taosArraySort(pInfo->pTaskList, entryComparFn); - - int32_t num = 1; - int32_t taskId = ((SCheckpointConsensusEntry*)taosArrayGet(pInfo->pTaskList, 0))->req.taskId; - for(int32_t i = 1; i < taosArrayGetSize(pInfo->pTaskList); ++i) { - SCheckpointConsensusEntry* pe = taosArrayGet(pInfo->pTaskList, i); - if (pe->req.taskId != taskId) { - num += 1; - taskId = pe->req.taskId; - } - } - - if (pTotal != NULL) { - *pTotal = num; - } - - ASSERT(num <= numOfTasks); - return num == numOfTasks; -} - -int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) { - if (pInfo->genTs > 0) { // there is no checkpoint ever generated if the checkpointId is 0. - mDebug("existed consensus-checkpointId:%" PRId64 " for stream:0x%" PRIx64 " %s exist, and return", - pInfo->checkpointId, pStream->uid, pStream->name); - return pInfo->checkpointId; - } - - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); - if (!mndAllTaskSendCheckpointId(pInfo, numOfTasks, NULL)) { - return -1; - } - - int64_t checkpointId = INT64_MAX; - - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { - SCheckpointConsensusEntry *pEntry = taosArrayGet(pInfo->pTaskList, i); - if (pEntry->req.checkpointId < checkpointId) { - checkpointId = pEntry->req.checkpointId; - mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name, - pEntry->req.taskId, pEntry->req.nodeId, pEntry->req.checkpointId); - } - } - - pInfo->checkpointId = checkpointId; - pInfo->genTs = taosGetTimestampMs(); - return checkpointId; + int32_t num = taosArrayGetSize(pInfo->pTaskList); + mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64 + " waiting tasks:%d", + pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num); } void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { @@ -968,15 +989,15 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { return TSDB_CODE_SUCCESS; } -int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) { - void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); - ASSERT(pInfo == NULL); - - SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL}; - taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); - - SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); - ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0); - mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId); - return TSDB_CODE_SUCCESS; -} \ No newline at end of file +//int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) { +// void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); +// ASSERT(pInfo == NULL); +// +// SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL}; +// taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); +// +// SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); +// ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0); +// mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId); +// return TSDB_CODE_SUCCESS; +//} \ No newline at end of file diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 9686fd3789..cfa24b2430 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -43,14 +43,14 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce char *p = streamTaskGetStatus(pTask)->name; if (pTask->info.fillHistory) { - sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam); } else { - sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, @@ -149,15 +149,15 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { case TDMT_VND_STREAM_TASK_UPDATE: return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); case TDMT_VND_STREAM_TASK_RESET: - return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg); + return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_PAUSE: return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_RESUME: return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); case TDMT_STREAM_TASK_UPDATE_CHKPT: - return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen); - case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: - return tqStreamProcessConsensusChkptRsp(pSnode->pMeta, pMsg); + return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont); + case TDMT_STREAM_CONSEN_CHKPT: + return tqStreamTaskProcessConsenChkptIdReq(pSnode->pMeta, pMsg); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1bec226489..4a47e08730 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -298,6 +298,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0a64b9c165..ac57a003c5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1016,7 +1016,11 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { - return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg, msgLen); + return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg); +} + +int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamTaskProcessConsenChkptIdReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { @@ -1239,7 +1243,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg); + return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont); } int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { @@ -1277,5 +1281,5 @@ int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamProcessConsensusChkptRsp(pTq->pStreamMeta, pMsg); + return tqStreamProcessConsensusChkptRsp2(pTq->pStreamMeta, pMsg); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 668e178d2d..1f3c049211 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -228,6 +228,9 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + + // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode + streamTaskSendPreparedCheckpointsourceRsp(pTask); streamTaskResetStatus(pTask); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); @@ -264,7 +267,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId); } - // stop streamTaskStop(pTask); if (ppHTask != NULL) { streamTaskStop(*ppHTask); @@ -279,7 +281,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - pMeta->startInfo.tasksWillRestart = 1; + if (restored) { + tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId); + pMeta->startInfo.tasksWillRestart = 1; + } if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, @@ -292,8 +297,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaClearUpdateTaskList(pMeta); if (!restored) { - tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); - pMeta->startInfo.tasksWillRestart = 0; + tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); } else { tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); #if 0 @@ -666,7 +670,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen return 0; } -int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen) { +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) { SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; int32_t vgId = pMeta->vgId; @@ -738,6 +742,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaStartAllTasks(pMeta); } else { streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId); + pMeta->startInfo.restartCount = 0; streamMetaWUnLock(pMeta); tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId); } @@ -854,8 +859,8 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { return TSDB_CODE_SUCCESS; } -int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont; +int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { + SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { @@ -867,7 +872,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); taosThreadMutexLock(&pTask->lock); - streamTaskClearCheckInfo(pTask, true); // clear flag set during do checkpoint, and open inputQ for all upstream tasks @@ -882,9 +886,10 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamTaskSetStatusReady(pTask); } else if (pState->state == TASK_STATUS__UNINIT) { - tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); - ASSERT(pTask->status.downstreamReady == 0); - tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); +// tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); +// ASSERT(pTask->status.downstreamReady == 0); +// tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); + tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } else { tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } @@ -1111,6 +1116,8 @@ int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { ret int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } +int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } + int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont; @@ -1126,22 +1133,21 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } -int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { +int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; + int32_t code = 0; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; int64_t now = taosGetTimestampMs(); - SRestoreCheckpointInfoRsp req = {0}; + SRestoreCheckpointInfo req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); - rsp.info.handle = NULL; - if (tDecodeRestoreCheckpointInfoRsp(&decoder, &req) < 0) { - // rsp.code = TSDB_CODE_MSG_DECODE_ERROR; // disable it temporarily - tqError("vgId:%d failed to decode restore task checkpointId, code:%s", vgId, tstrerror(rsp.code)); + if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) { + tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code)); tDecoderClear(&decoder); return TSDB_CODE_SUCCESS; } @@ -1150,7 +1156,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { - tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already", + tqError("vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.taskId); streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return TSDB_CODE_SUCCESS; @@ -1172,16 +1178,25 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { taosThreadMutexLock(&pTask->lock); ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); + if (pTask->chkInfo.consensusTransId >= req.transId) { + tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", + pTask->id.idStr, vgId, pTask->chkInfo.consensusTransId, req.transId); + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } + if (pTask->chkInfo.checkpointId != req.checkpointId) { - tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId, - pTask->chkInfo.checkpointId, req.checkpointId); + tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64" transId:%d", pTask->id.idStr, vgId, + pTask->chkInfo.checkpointId, req.checkpointId, req.transId); pTask->chkInfo.checkpointId = req.checkpointId; tqSetRestoreVersionInfo(pTask); } else { - tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, no need to update", - pTask->id.idStr, vgId, req.checkpointId); + tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update", + pTask->id.idStr, vgId, req.checkpointId, req.transId); } + pTask->chkInfo.consensusTransId = req.transId; taosThreadMutexUnlock(&pTask->lock); if (pMeta->role == NODE_ROLE_LEADER) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 29104c6c12..fb09a46e75 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -630,6 +630,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } } break; + case TDMT_STREAM_CONSEN_CHKPT: { + if (pVnode->restored) { + tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg); + } + } break; case TDMT_STREAM_TASK_PAUSE: { if (pVnode->restored && vnodeIsLeader(pVnode) && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { @@ -647,11 +652,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg tqProcessTaskResetReq(pVnode->pTq, pMsg); } } break; - case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: { - if (pVnode->restored) { - tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg); - } - } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 8778e3314a..226a06be7e 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -666,13 +666,18 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); - addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); + + // not record the failed of the current task if try to close current vnode + // otherwise, the put of message operation may incur invalid read of message queue. + if (!pMeta->closeFlag) { + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); + } streamMetaReleaseTask(pMeta, pTask); return; } - if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) { + if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); @@ -698,31 +703,31 @@ void rspMonitorFn(void* param, void* tmrId) { if (pStat->state == TASK_STATUS__UNINIT) { getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); + + numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); + numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); + + // fault tasks detected, not try anymore + ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total); + if (numOfFault > 0) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug( + "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " + "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + + streamTaskCompleteCheckRsp(pInfo, false, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + streamMetaReleaseTask(pMeta, pTask); + + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); + return; + } } else { // unexpected status stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } - numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); - numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); - - // fault tasks detected, not try anymore - ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total); - if (numOfFault > 0) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug( - "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); - - streamTaskCompleteCheckRsp(pInfo, false, id); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - streamMetaReleaseTask(pMeta, pTask); - - taosArrayDestroy(pNotReadyList); - taosArrayDestroy(pTimeoutList); - return; - } - // checking of downstream tasks has been stopped by other threads if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index bc973f17d7..cdb5bf0b50 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -405,12 +405,14 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->chkInfo.startTs = 0; // clear the recorded start time - - streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo); streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks + + taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock); + streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo); if (clearChkpReadyMsg) { streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo); } + taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock); } int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { @@ -447,14 +449,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (restored && (pStatus->state != TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) { - stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 - " failed", - pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); - taosThreadMutexUnlock(&pTask->lock); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - if (!restored) { // during restore procedure, do update checkpoint-info stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, @@ -1115,8 +1109,20 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { const char* id = pTask->id.idStr; SCheckpointInfo* pInfo = &pTask->chkInfo; - ASSERT(pTask->pBackend == NULL); + taosThreadMutexLock(&pTask->lock); + if (pTask->status.sendConsensusChkptId == true) { + stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; + } else { + pTask->status.sendConsensusChkptId = true; + } + taosThreadMutexUnlock(&pTask->lock); + + ASSERT(pTask->pBackend == NULL); + pTask->status.requireConsensusChkptId = true; +#if 0 SRestoreCheckpointInfo req = { .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, @@ -1148,10 +1154,27 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { tEncoderClear(&encoder); SRpcMsg msg = {0}; - initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_CONSEN, buf, tlen); + initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, buf, tlen); stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId, pInfo->checkpointId); tmsgSendReq(&pTask->info.mnodeEpset, &msg); +#endif return 0; +} + +int32_t streamTaskSendPreparedCheckpointsourceRsp(SStreamTask* pTask) { + int32_t code = 0; + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + return code; + } + + taosThreadMutexLock(&pTask->lock); + SStreamTaskState* p = streamTaskGetStatus(pTask); + if (p->state == TASK_STATUS__CK) { + code = streamTaskSendCheckpointSourceRsp(pTask); + } + taosThreadMutexUnlock(&pTask->lock); + + return code; } \ No newline at end of file diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 83e73e8c88..617adaa016 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -813,9 +813,20 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { taosThreadMutexLock(&pActiveInfo->lock); SArray* pList = pActiveInfo->pReadyMsgList; + int32_t num = taosArrayGetSize(pList); + + // active checkpoint info is cleared for now + if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) { + taosThreadMutexUnlock(&pActiveInfo->lock); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); - int32_t num = taosArrayGetSize(pList); ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num); for (int32_t i = 0; i < num; ++i) { diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index d6411e25f2..16cb23de10 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -168,7 +168,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { continue; } + taosThreadMutexLock(&(*pTask)->lock); STaskStatusEntry entry = streamTaskGetStatusEntry(*pTask); + taosThreadMutexUnlock(&(*pTask)->lock); entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { @@ -192,6 +194,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } } + if ((*pTask)->status.requireConsensusChkptId) { + entry.checkpointInfo.consensusChkptId = 1; + (*pTask)->status.requireConsensusChkptId = false; + stDebug("s-task:%s vgId:%d set the require consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); + } + if ((*pTask)->exec.pWalReader != NULL) { entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1; if (entry.processedVer < 0) { @@ -324,7 +332,7 @@ int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) { stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId); SMetaHbInfo* pInfo = pMeta->pHbInfo; - streamMetaRLock(pMeta); + streamMetaWLock(pMeta); // current waiting rsp recved if (pRsp->msgId == pInfo->hbCount) { @@ -337,6 +345,6 @@ int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) { stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId); } - streamMetaRUnLock(pMeta); + streamMetaWUnLock(pMeta); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7b94f642e2..e7fdb7ae2a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1031,10 +1031,11 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { taosHashClear(pStartInfo->pFailedTaskSet); pStartInfo->tasksWillRestart = 0; pStartInfo->readyTs = 0; + pStartInfo->elapsedTime = 0; // reset the sentinel flag value to be 0 pStartInfo->startAllTasks = 0; - stDebug("vgId:%d clear all start-all-task info", vgId); + stDebug("vgId:%d clear start-all-task info", vgId); } void streamMetaRLock(SStreamMeta* pMeta) { @@ -1198,7 +1199,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { continue; } - if ((pTask->pBackend == NULL) && (pTask->info.fillHistory == 1 || HAS_RELATED_FILLHISTORY_TASK(pTask))) { + if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) { code = pMeta->expandTaskFn(pTask); if (code != TSDB_CODE_SUCCESS) { stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); @@ -1392,17 +1393,24 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 streamMetaWLock(pMeta); - if (pStartInfo->startAllTasks != 1) { - int64_t el = endTs - startTs; - stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", - pMeta->vgId, taskId, ready, el); + SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (p == NULL) { // task does not exists in current vnode, not record the complete info + stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); streamMetaWUnLock(pMeta); return 0; } - void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (p == NULL) { // task does not exists in current vnode, not record the complete info - stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); + // clear the send consensus-checkpointId flag + taosThreadMutexLock(&(*p)->lock); + (*p)->status.sendConsensusChkptId = false; + taosThreadMutexUnlock(&(*p)->lock); + + if (pStartInfo->startAllTasks != 1) { + int64_t el = endTs - startTs; + stDebug( + "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed " + "time:%" PRId64 "ms", + pMeta->vgId, taskId, ready, el); streamMetaWUnLock(pMeta); return 0; } diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index e0435156e2..40582b5144 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -349,6 +349,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; if (tEncodeI64(pEncoder, ps->checkpointInfo.latestSize) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup) < 0) return -1; + if (tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs) < 0) return -1; if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; @@ -403,6 +405,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize) < 0) return -1; if (tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup) < 0) return -1; + if (tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; @@ -634,6 +638,7 @@ int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpoi if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; tEndEncode(pEncoder); @@ -645,28 +650,9 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; tEndDecode(pDecoder); return 0; } - -int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo) { - if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI64(pCoder, pInfo->startTs) < 0) return -1; - if (tEncodeI64(pCoder, pInfo->streamId) < 0) return -1; - if (tEncodeI32(pCoder, pInfo->taskId) < 0) return -1; - if (tEncodeI64(pCoder, pInfo->checkpointId) < 0) return -1; - tEndEncode(pCoder); - return 0; -} - -int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo) { - if (tStartDecode(pCoder) < 0) return -1; - if (tDecodeI64(pCoder, &pInfo->startTs) < 0) return -1; - if (tDecodeI64(pCoder, &pInfo->streamId) < 0) return -1; - if (tDecodeI32(pCoder, &pInfo->taskId) < 0) return -1; - if (tDecodeI64(pCoder, &pInfo->checkpointId) < 0) return -1; - tEndDecode(pCoder); - return 0; -} \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1decfe198a..9bcad87264 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -268,13 +268,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } streamTaskCleanupCheckInfo(&pTask->taskCheckInfo); - - if (pTask->pState) { - stDebug("s-task:0x%x start to free task state", taskId); - streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING); - taskDbRemoveRef(pTask->pBackend); - pTask->pBackend = NULL; - } + streamFreeTaskState(pTask, status1); if (pTask->pNameMap) { tSimpleHashCleanup(pTask->pNameMap); @@ -311,6 +305,16 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x free task completed", taskId); } +void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status) { + if (pTask->pState != NULL) { + stDebug("s-task:0x%x start to free task state", pTask->id.taskId); + streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); + taskDbRemoveRef(pTask->pBackend); + pTask->pBackend = NULL; + pTask->pState = NULL; + } +} + static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) { SCheckpointInfo* pChkInfo = &pTask->chkInfo; SDataRange* pRange = &pTask->dataRange; @@ -848,6 +852,8 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) { .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime, .checkpointInfo.latestSize = 0, .checkpointInfo.remoteBackup = 0, + .checkpointInfo.consensusChkptId = 0, + .checkpointInfo.consensusTs = taosGetTimestampMs(), .hTaskId = pTask->hTaskInfo.id.taskId, .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize), .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize), diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 85d3e0068a..f2bd99cdaf 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -79,6 +79,12 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv return 0; } +static int32_t stopTaskSuccFn(SStreamTask* pTask) { + SStreamTaskSM* pSM = pTask->status.pSM; + streamFreeTaskState(pTask, pSM->current.state); + return TSDB_CODE_SUCCESS; +} + int32_t streamTaskInitStatus(SStreamTask* pTask) { pTask->execInfo.checkTs = taosGetTimestampMs(); stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, @@ -634,21 +640,21 @@ void doInitStateTransferTable(void) { // resume is completed by restore status of state-machine // stop related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); // dropping related event