diff --git a/include/common/tmsg.h b/include/common/tmsg.h index eecfc3fe9a..a83aa4da44 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3506,12 +3506,6 @@ int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamR int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq); void tFreeMDropStreamReq(SMDropStreamReq* pReq); -typedef struct { - int64_t recoverObjUid; - int32_t taskId; - int32_t hasCheckPoint; -} SMVStreamGatherInfoReq; - typedef struct SVUpdateCheckpointInfoReq { SMsgHead head; int64_t streamId; @@ -3537,50 +3531,8 @@ typedef struct { int64_t suid; } SMqRebVgReq; -static FORCE_INLINE int tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) { - if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1; - if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1; - if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1; - if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1; - if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1; - if (tEncodeI8(pCoder, pReq->subType) < 0) return -1; - if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1; - - if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; - } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; - if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; - } - tEndEncode(pCoder); - return 0; -} - -static FORCE_INLINE int tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) { - if (tStartDecode(pCoder) < 0) return -1; - - if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1; - - if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1; - if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1; - if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1; - if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1; - - if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; - } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; - if (!tDecodeIsEnd(pCoder)) { - if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; - } - } - - tEndDecode(pCoder); - return 0; -} +int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq); +int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq); typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index ad2c593f4e..c92649f1f7 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -250,7 +250,9 @@ TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) - TD_CLOSE_MSG_SEG(TDMT_MND_MSG) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_CONSEN, "stream-chkpt-consen", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) + TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) @@ -331,7 +333,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) //1035 1036 TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) //unused TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 4a8fc6260a..566e8dbbd8 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -29,6 +29,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streamMsg.h similarity index 87% rename from include/libs/stream/streammsg.h rename to include/libs/stream/streamMsg.h index 5f6f93a4ff..b69032330d 100644 --- a/include/libs/stream/streammsg.h +++ b/include/libs/stream/streamMsg.h @@ -17,6 +17,7 @@ #define TDENGINE_STREAMMSG_H #include "tmsg.h" +#include "trpc.h" #ifdef __cplusplus extern "C" { @@ -162,6 +163,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint typedef struct SStreamHbMsg { int32_t vgId; + int32_t msgId; int32_t numOfTasks; SArray* pTaskStatus; // SArray SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. @@ -171,6 +173,11 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); +typedef struct { + SMsgHead head; + int32_t msgId; +} SMStreamHbRspMsg; + typedef struct SRetrieveChkptTriggerReq { SMsgHead head; int64_t streamId; @@ -204,6 +211,28 @@ typedef struct SCheckpointReport { int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq); int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq); +typedef struct SRestoreCheckpointInfo { + SMsgHead head; + int64_t startTs; + int64_t streamId; + int64_t checkpointId; // latest checkpoint id + int32_t taskId; + int32_t nodeId; +} SRestoreCheckpointInfo; + +int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); +int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); + +typedef struct SRestoreCheckpointInfoRsp { + int64_t streamId; + int64_t checkpointId; + int64_t startTs; + int32_t taskId; +} SRestoreCheckpointInfoRsp; + +int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo); +int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo); + typedef struct { SMsgHead head; int64_t streamId; @@ -211,6 +240,12 @@ typedef struct { int32_t reqType; } SStreamTaskRunReq; +typedef struct SCheckpointConsensusEntry { + SRestoreCheckpointInfo req; + SRpcMsg rsp; + int64_t ts; +} SCheckpointConsensusEntry; + #ifdef __cplusplus } #endif diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d35b17126d..e98039d2fe 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -17,8 +17,9 @@ #define TDENGINE_TSTREAM_H #include "os.h" +#include "streamMsg.h" #include "streamState.h" -#include "streammsg.h" +#include "streamMsg.h" #include "tdatablock.h" #include "tdbInt.h" #include "tmsg.h" @@ -265,12 +266,13 @@ typedef struct SStreamTaskId { } SStreamTaskId; typedef struct SCheckpointInfo { - int64_t startTs; - int64_t checkpointId; // latest checkpoint id - int64_t checkpointVer; // latest checkpoint offset in wal - int64_t checkpointTime; // latest checkpoint time - int64_t processedVer; - int64_t nextProcessVer; // current offset in WAL, not serialize it + int64_t startTs; + int64_t checkpointId; // latest checkpoint id + int64_t checkpointVer; // latest checkpoint offset in wal + int64_t checkpointTime; // latest checkpoint time + int64_t processedVer; + int64_t nextProcessVer; // current offset in WAL, not serialize it + SActiveCheckpointInfo* pActiveInfo; int64_t msgVer; } SCheckpointInfo; @@ -523,7 +525,6 @@ typedef struct STaskUpdateEntry { } STaskUpdateEntry; typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); -typedef int32_t (*__stream_task_expand_fn)(struct SStreamTask* pTask); SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5); @@ -613,6 +614,12 @@ typedef struct SStreamTaskState { char* name; } SStreamTaskState; +typedef struct SCheckpointConsensusInfo { + SArray* pTaskList; + int64_t checkpointId; + int64_t genTs; +} SCheckpointConsensusInfo; + int32_t streamSetupScheduleTrigger(SStreamTask* pTask); // dispatch related @@ -704,6 +711,7 @@ void streamTaskSetRemoveBackendFiles(SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); +STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); @@ -746,14 +754,15 @@ void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); -void streamMetaResetStartInfo(STaskStartInfo* pMeta); +void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId); SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); void streamMetaLoadAllTasks(SStreamMeta* pMeta); -int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn fn); +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); -int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn fn); +int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); +int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask); // timer tmr_h streamTimerGetInstance(); @@ -790,6 +799,9 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq); int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* req); void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp); +int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp); + + #ifdef __cplusplus } #endif diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 611891e298..836a171ad7 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -828,7 +828,6 @@ TEST(clientCase, projection_query_tables) { // printf("error in create db, reason:%s\n", taos_errstr(pRes)); // } // taos_free_result(pRes); - pRes= taos_query(pConn, "use abc1"); taos_free_result(pRes); diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index ac94625f8e..d0b10b7f41 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -151,9 +151,9 @@ void startRsync() { // start rsync service to backup checkpoint code = system(cmd); if (code != 0) { - uError("[rsync] start server failed, code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + uError("[rsync] cmd:%s start server failed, code:%d," ERRNO_ERR_FORMAT, cmd, code, ERRNO_ERR_DATA); } else { - uDebug("[rsync] start server successful"); + uInfo("[rsync] cmd:%s start server successful", cmd); } } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1d66a8c323..10719674f5 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9296,6 +9296,52 @@ int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) { } void tDeleteSTqCheckInfo(STqCheckInfo *pInfo) { taosArrayDestroy(pInfo->colIdList); } +int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) { + if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1; + if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1; + if (tEncodeI8(pCoder, pReq->subType) < 0) return -1; + if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1; + + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; + } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { + if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; + } + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1; + + if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1; + if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1; + + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; + } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { + if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; + } + } + + tEndDecode(pCoder); + return 0; +} + + int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { int32_t nUid = taosArrayGetSize(pRes->uidList); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index e137bcbdec..677e19d4c1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -242,6 +242,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 6a9f2f1275..7a0189b7c1 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -96,6 +96,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 6053c4db80..001696aecc 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -972,6 +972,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index dade7b4bdf..69f430778b 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -34,6 +34,7 @@ extern "C" { #define MND_STREAM_TASK_RESET_NAME "stream-task-reset" #define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" #define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update" +#define MND_STREAM_CHKPT_CONSEN_NAME "stream-chkpt-consen" typedef struct SStreamTransInfo { int64_t startTime; @@ -61,6 +62,7 @@ typedef struct SStreamExecInfo { TdThreadMutex lock; SHashObj *pTransferStateStreams; SHashObj *pChkptStreams; + SHashObj *pStreamConsensus; } SStreamExecInfo; extern SStreamExecInfo execInfo; @@ -81,7 +83,7 @@ typedef struct SOrphanTask { typedef struct { SMsgHead head; -} SMStreamHbRspMsg, SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp; +} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp; typedef struct STaskChkptInfo { int32_t nodeId; @@ -131,6 +133,8 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); +int32_t mndSendConsensusCheckpointIdRsp(SArray* pList, int64_t checkpointId); + void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); @@ -142,6 +146,14 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInf int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId); +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg); +int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo *pInfo, SStreamObj *pStream); +bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numOfTasks, int32_t* pTotal); +void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); +int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId); +int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3291f6e3ed..db428635a8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -59,6 +59,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); +static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -117,6 +118,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_CONSEN, mndProcessConsensusCheckpointId); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); @@ -150,6 +152,7 @@ void mndCleanupStream(SMnode *pMnode) { taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.pTransferStateStreams); taosHashCleanup(execInfo.pChkptStreams); + taosHashCleanup(execInfo.pStreamConsensus); taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); } @@ -1233,6 +1236,9 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); sdbRelease(pSdb, p); + // clear the consensus checkpoint info + mndClearConsensusCheckpointId(execInfo.pStreamConsensus, p->uid); + if (code != -1) { started += 1; @@ -2513,7 +2519,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { return 0; } -static void doAddTaskInfo(SArray *pList, SCheckpointReport *pReport) { +static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pReport) { bool existed = false; for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { STaskChkptInfo *p = taosArrayGet(pList, i); @@ -2584,12 +2590,12 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); - doAddTaskInfo(pList, &req); + doAddReportStreamTask(pList, &req); taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); } else { - doAddTaskInfo(*pReqTaskList, &req); + doAddReportStreamTask(*pReqTaskList, &req); } int32_t total = taosArrayGetSize(*pReqTaskList); @@ -2597,25 +2603,6 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 " will be issued soon", req.streamId, pStream->name, total, req.checkpointId); - - // if (pStream != NULL) { - // bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); - // if (conflict) { - // mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); - // } else { - // int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); - // if (code == TSDB_CODE_SUCCESS) { // remove this entry - // taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); - // - // int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); - // mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, - // numOfStreams); - // } else { - // mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", - // req.streamId); - // } - // } - // } } if (pStream != NULL) { @@ -2637,6 +2624,101 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { return 0; } +static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SDecoder decoder = {0}; + + SRestoreCheckpointInfo req = {0}; + tDecoderInit(&decoder, pReq->pCont, pReq->contLen); + + if (tDecodeRestoreCheckpointInfo(&decoder, &req)) { + tDecoderClear(&decoder); + terrno = TSDB_CODE_INVALID_MSG; + mError("invalid task consensus-checkpoint msg received"); + return -1; + } + tDecoderClear(&decoder); + + mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64, + req.nodeId, req.streamId, req.taskId, req.checkpointId); + + // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. + taosThreadMutexLock(&execInfo.lock); + + SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + if (pStream == NULL) { + mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); + + // not in meta-store yet, try to acquire the task in exec buffer + // the checkpoint req arrives too soon before the completion of the create stream trans. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } + } + + int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); + + SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId); + int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream); + if (ckId != -1) { // consensus checkpoint id already exist + SRpcMsg rsp = {0}; + rsp.code = 0; + rsp.info = pReq->info; + rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); + rsp.pCont = rpcMallocCont(rsp.contLen); + + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(req.nodeId); + + mDebug("stream:0x%" PRIx64 " consensus checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId); + doSendConsensusCheckpointRsp(&req, &rsp, ckId); + + taosThreadMutexUnlock(&execInfo.lock); + pReq->info.handle = NULL; // disable auto rsp + + return TSDB_CODE_SUCCESS; + } + + mndAddConsensusTasks(pInfo, &req, pReq); + + int32_t total = 0; + if (mndAllTaskSendCheckpointId(pInfo, numOfTasks, &total)) { // all tasks has send the reqs + // start transaction to set the checkpoint id + int64_t checkpointId = mndGetConsensusCheckpointId(pInfo, pStream); + mInfo("stream:0x%" PRIx64 " %s all %d tasks send latest checkpointId, the consensus-checkpointId is:%" PRId64 + " will be issued soon", + req.streamId, pStream->name, numOfTasks, checkpointId); + + // start the checkpoint consensus trans + int32_t code = mndSendConsensusCheckpointIdRsp(pInfo->pTaskList, checkpointId); + if (code == TSDB_CODE_SUCCESS) { + mndClearConsensusRspEntry(pInfo); + mDebug("clear all waiting for rsp entry for stream:0x%" PRIx64, req.streamId); + } else { + mDebug("stream:0x%" PRIx64 " not start send consensus-checkpointId msg, due to not all task ready", req.streamId); + } + } else { + mDebug("stream:0x%" PRIx64 " %d/%d tasks send consensus-checkpointId info", req.streamId, total, numOfTasks); + } + + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + + taosThreadMutexUnlock(&execInfo.lock); + pReq->info.handle = NULL; // disable auto rsp + + return 0; +} + static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { int32_t code = mndProcessCreateStreamReq(pReq); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 5c1dd3ab7a..c7f97b4a62 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,9 +22,18 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; -static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode); +static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode); +static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); +static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo); +static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId); +static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); +static int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList); +static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); +static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); +static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); +static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId); -static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { +void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j); @@ -39,7 +48,7 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { } } -static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { +void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { SFailedCheckpointInfo *p = taosArrayGet(pList, i); @@ -86,7 +95,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { +int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { int32_t code = TSDB_CODE_SUCCESS; mndKillTransImpl(pMnode, transId, ""); @@ -110,7 +119,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in return code; } -static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { +int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { int32_t num = taosArrayGetSize(pNodeList); mInfo("set node expired for %d nodes", num); @@ -133,15 +142,14 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { } if (!setFlag) { - mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist, update it", *pVgId); - ASSERT(0); + mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist", *pVgId); return TSDB_CODE_FAILED; } } return TSDB_CODE_SUCCESS; } -static int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) { +int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) { SOrphanTask *pTask = taosArrayGet(pList, 0); // check if it is conflict with other trans in both sourceDb and targetDb. @@ -238,7 +246,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); + mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); @@ -246,6 +254,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); mndInitStreamExecInfo(pMnode, &execInfo); + if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { + mError("invalid hbMsg from vgId:%d, discarded", req.vgId); + + terrno = TSDB_CODE_INVALID_MSG; + doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); + + taosThreadMutexUnlock(&execInfo.lock); + cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); + return -1; + } int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { @@ -335,21 +353,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } taosThreadMutexUnlock(&execInfo.lock); - tCleanupStreamHbMsg(&req); - taosArrayDestroy(pFailedChkpt); - taosArrayDestroy(pOrphanTasks); - - { - SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)}; - rsp.pCont = rpcMallocCont(rsp.contLen); - SMsgHead *pHead = rsp.pCont; - pHead->vgId = htonl(req.vgId); - - tmsgSendRsp(&rsp); - pReq->info.handle = NULL; // disable auto rsp - } + terrno = TSDB_CODE_SUCCESS; + doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); + cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); return TSDB_CODE_SUCCESS; } @@ -360,4 +368,33 @@ void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doC SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } -} \ No newline at end of file +} + +bool validateHbMsg(const SArray *pNodeList, int32_t vgId) { + for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { + SNodeEntry *pEntry = taosArrayGet(pNodeList, i); + if (pEntry->nodeId == vgId) { + return true; + } + } + + return false; +} + +void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks) { + tCleanupStreamHbMsg(pReq); + taosArrayDestroy(pFailedChkptList); + taosArrayDestroy(pOrphanTasks); +} + +void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) { + SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + + SMStreamHbRspMsg *pMsg = rsp.pCont; + pMsg->head.vgId = htonl(vgId); + pMsg->msgId = msgId; + + tmsgSendRsp(&rsp); + pRpcInfo->handle = NULL; // disable auto rsp +} diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index e47f28c309..98b9c7c0c7 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -575,10 +575,12 @@ void mndInitExecInfo() { execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList); + taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList); } void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { @@ -817,4 +819,150 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { taosArrayDestroy(pDropped); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} + +int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) { + int32_t code = 0; + int32_t blen; + + SRestoreCheckpointInfoRsp req = { + .streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId, .startTs = pInfo->startTs}; + + tEncodeSize(tEncodeRestoreCheckpointInfoRsp, &req, blen, code); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int32_t tlen = sizeof(SMsgHead) + blen; + void *abuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + tEncodeRestoreCheckpointInfoRsp(&encoder, &req); + + SMsgHead *pMsgHead = (SMsgHead *)pMsg->pCont; + pMsgHead->contLen = htonl(tlen); + pMsgHead->vgId = htonl(pInfo->nodeId); + tEncoderClear(&encoder); + + tmsgSendRsp(pMsg); + return code; +} + +int32_t mndSendConsensusCheckpointIdRsp(SArray* pInfoList, int64_t checkpointId) { + for(int32_t i = 0; i < taosArrayGetSize(pInfoList); ++i) { + SCheckpointConsensusEntry* pInfo = taosArrayGet(pInfoList, i); + doSendConsensusCheckpointRsp(&pInfo->req, &pInfo->rsp, checkpointId); + } + return 0; +} + +SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId) { + void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); + if (pInfo != NULL) { + return (SCheckpointConsensusInfo*)pInfo; + } + + SCheckpointConsensusInfo p = { + .genTs = -1, .checkpointId = -1, .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry))}; + taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); + + void* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); + return pChkptInfo; +} + +// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually +// discard the msg may lead to the lost of connections. +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg) { + SCheckpointConsensusEntry info = {0}; + memcpy(&info.req, pRestoreInfo, sizeof(info.req)); + + info.rsp.code = 0; + info.rsp.info = pMsg->info; + info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); + info.rsp.pCont = rpcMallocCont(info.rsp.contLen); + + SMsgHead *pHead = info.rsp.pCont; + pHead->vgId = htonl(pRestoreInfo->nodeId); + + taosArrayPush(pInfo->pTaskList, &info); +} + +static int32_t entryComparFn(const void* p1, const void* p2) { + const SCheckpointConsensusEntry* pe1 = p1; + const SCheckpointConsensusEntry* pe2 = p2; + + if (pe1->req.taskId == pe2->req.taskId) { + return 0; + } + + return pe1->req.taskId < pe2->req.taskId? -1:1; +} + +bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTasks, int32_t* pTotal) { + int32_t numOfExisted = taosArrayGetSize(pInfo->pTaskList); + if (numOfExisted < numOfTasks) { + if (pTotal != NULL) { + *pTotal = numOfExisted; + } + return false; + } + + taosArraySort(pInfo->pTaskList, entryComparFn); + + int32_t num = 1; + int32_t taskId = ((SCheckpointConsensusEntry*)taosArrayGet(pInfo->pTaskList, 0))->req.taskId; + for(int32_t i = 1; i < taosArrayGetSize(pInfo->pTaskList); ++i) { + SCheckpointConsensusEntry* pe = taosArrayGet(pInfo->pTaskList, i); + if (pe->req.taskId != taskId) { + num += 1; + taskId = pe->req.taskId; + } + } + + if (pTotal != NULL) { + *pTotal = num; + } + + ASSERT(num <= numOfTasks); + return num == numOfTasks; +} + +int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) { + if (pInfo->genTs > 0) { + ASSERT(pInfo->checkpointId > 0); + return pInfo->checkpointId; + } + + int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + if (!mndAllTaskSendCheckpointId(pInfo, numOfTasks, NULL)) { + return -1; + } + + int64_t checkpointId = INT64_MAX; + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { + SCheckpointConsensusEntry *pEntry = taosArrayGet(pInfo->pTaskList, i); + if (pEntry->req.checkpointId < checkpointId) { + checkpointId = pEntry->req.checkpointId; + mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name, + pEntry->req.taskId, pEntry->req.nodeId, pEntry->req.checkpointId); + } + } + + pInfo->checkpointId = checkpointId; + pInfo->genTs = taosGetTimestampMs(); + return checkpointId; +} + +void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { + pInfo->pTaskList = taosArrayDestroy(pInfo->pTaskList); +} + +int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { + taosHashRemove(pHash, &streamId, sizeof(streamId)); + int32_t numOfStreams = taosHashGetSize(pHash); + mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list after new checkpoint generated, remain:%d", streamId, + numOfStreams); + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3f7d3d3e8c..9686fd3789 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -25,7 +25,7 @@ #define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0) // clang-format on -int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { +int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); if (code != TSDB_CODE_SUCCESS) { @@ -71,8 +71,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { startRsync(); pSnode->msgCb = pOption->msgCb; - pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE, - taosGetTimestampMs(), tqStartTaskCompleteCallback); + pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); if (pSnode->pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; @@ -157,6 +156,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); case TDMT_STREAM_TASK_UPDATE_CHKPT: return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen); + case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: + return tqStreamProcessConsensusChkptRsp(pSnode->pMeta, pMsg); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 22b26498e4..1bec226489 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -262,6 +262,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4be5983be8..0a64b9c165 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -139,9 +139,11 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->pCheckInfo); taosMemoryFree(pTq->path); tqMetaClose(pTq); + + int32_t vgId = pTq->pStreamMeta->vgId; streamMetaClose(pTq->pStreamMeta); - qDebug("end to close tq"); + qDebug("vgId:%d end to close tq", vgId); taosMemoryFree(pTq); } @@ -1030,7 +1032,6 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { - // return 0; } @@ -1274,3 +1275,7 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg); } + +int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessConsensusChkptRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 40da0f0320..dabd4ff455 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -121,7 +121,6 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); @@ -132,6 +131,26 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK); } +int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + if (numOfTasks == 0) { + tqDebug("vgId:%d no stream tasks existed to run", vgId); + return 0; + } + + tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId); + if (pTask == NULL) { + tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + int32_t code = streamTaskSendRestoreChkptMsg(pTask); + streamMetaReleaseTask(pMeta, pTask); + return code; +} + int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -191,7 +210,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaInitUpdateTaskList(pMeta, req.transId); } } else { - tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId); + tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d, recorded update transId:%d", idstr, + vgId, req.transId, pMeta->updateInfo.transId); } // duplicate update epset msg received, discard this redundant message @@ -660,8 +680,11 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored if (ppTask != NULL && (*ppTask) != NULL) { streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq); } else { // failed to get the task. - tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already", - vgId, pReq->taskId); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + tqError( + "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been " + "dropped already", + vgId, pReq->taskId, numOfTasks); } streamMetaWUnLock(pMeta); @@ -712,9 +735,9 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { if (isLeader && !tsDisableStream) { streamMetaWUnLock(pMeta); - streamMetaStartAllTasks(pMeta, tqExpandStreamTask); + streamMetaStartAllTasks(pMeta); } else { - streamMetaResetStartInfo(&pMeta->startInfo); + streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId); streamMetaWUnLock(pMeta); tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId); } @@ -730,10 +753,10 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t vgId = pMeta->vgId; if (type == STREAM_EXEC_T_START_ONE_TASK) { - streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId, tqExpandStreamTask); + streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); return 0; } else if (type == STREAM_EXEC_T_START_ALL_TASKS) { - streamMetaStartAllTasks(pMeta, tqExpandStreamTask); + streamMetaStartAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) { restartStreamTasks(pMeta, isLeader); @@ -861,7 +884,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { } else if (pState->state == TASK_STATUS__UNINIT) { tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); ASSERT(pTask->status.downstreamReady == 0); - tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); + tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); } else { tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } @@ -1070,7 +1093,9 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } -int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } +int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + return streamProcessHeartbeatRsp(pMeta, pMsg->pCont); +} int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } @@ -1087,6 +1112,71 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { } streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; +} + +int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + int64_t now = taosGetTimestampMs(); + + SRestoreCheckpointInfoRsp req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + + rsp.info.handle = NULL; + if (tDecodeRestoreCheckpointInfoRsp(&decoder, &req) < 0) { + // rsp.code = TSDB_CODE_MSG_DECODE_ERROR; // disable it temporarily + tqError("vgId:%d failed to decode restore task checkpointId, code:%s", vgId, tstrerror(rsp.code)); + tDecoderClear(&decoder); + return TSDB_CODE_SUCCESS; + } + + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); + if (pTask == NULL) { + tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, req.taskId); + streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); + return TSDB_CODE_SUCCESS; + } + + // discard the rsp from before restart + if (req.startTs < pTask->execInfo.created) { + tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64 + " from task createTs:%" PRId64 ", discard", + pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs); + streamMetaAddFailedTaskSelf(pTask, now); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode", + pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId); + + taosThreadMutexLock(&pTask->lock); + ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); + + if ((pTask->chkInfo.checkpointId != req.checkpointId) && req.checkpointId != 0) { + tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId, + pTask->chkInfo.checkpointId, req.checkpointId); + pTask->chkInfo.checkpointId = req.checkpointId; + tqSetRestoreVersionInfo(pTask); + } + + taosThreadMutexUnlock(&pTask->lock); + + if (pMeta->role == NODE_ROLE_LEADER) { + /*code = */ tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId); + } else { + tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr); + } + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2d05cc2e00..9006d36e65 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -647,6 +647,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg tqProcessTaskResetReq(pVnode->pTq, pMsg); } } break; + case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: { + if (pVnode->restored) { + tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg); + } + } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4741745419..008d066717 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -65,6 +65,11 @@ struct SActiveCheckpointInfo { tmr_h pSendReadyMsgTmr; }; +struct SConsensusCheckpoint { + int8_t inProcess; + +}; + typedef struct { int8_t type; SSDataBlock* pBlock; @@ -157,6 +162,7 @@ extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; extern int32_t taskDbWrapperId; +extern int32_t streamMetaId; int32_t streamTimerInit(); void streamTimerCleanUp(); @@ -211,7 +217,13 @@ void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); -void streamMetaRemoveDB(void* arg, char* key); +void streamMetaRemoveDB(void* arg, char* key); +void streamMetaHbToMnode(void* param, void* tmrId); +SMetaHbInfo* createMetaHbInfo(int64_t* pRid); +void* destroyMetaHbInfo(SMetaHbInfo* pInfo); +void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta); +void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount); +int32_t streamMetaSendHbHelper(SStreamMeta* pMeta); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 06773c79e3..cf7852cddb 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1893,8 +1893,12 @@ void* taskDbAddRef(void* pTaskDb) { STaskDbWrapper* pBackend = pTaskDb; return taosAcquireRef(taskDbWrapperId, pBackend->refId); } + void taskDbRemoveRef(void* pTaskDb) { - if (pTaskDb == NULL) return; + if (pTaskDb == NULL) { + return; + } + STaskDbWrapper* pBackend = pTaskDb; taosReleaseRef(taskDbWrapperId, pBackend->refId); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 78bd5c4511..bc973f17d7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -447,14 +447,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - // if (restored && (pStatus->state != TASK_STATUS__CK)) { - // stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" - // PRId64 - // " failed", - // pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); - // taosThreadMutexUnlock(&pTask->lock); - // return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - // } + if (restored && (pStatus->state != TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) { + stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " failed", + pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } if (!restored) { // during restore procedure, do update checkpoint-info stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 @@ -1108,3 +1107,51 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { s3DeleteObjects((const char**)&tmp, 1); return 0; } + +int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { + int32_t code; + int32_t tlen = 0; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SCheckpointInfo* pInfo = &pTask->chkInfo; + + ASSERT(pTask->pBackend == NULL); + + SRestoreCheckpointInfo req = { + .streamId = pTask->id.streamId, + .taskId = pTask->id.taskId, + .nodeId = vgId, + .checkpointId = pInfo->checkpointId, + .startTs = pTask->execInfo.created, + }; + + tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code); + if (code < 0) { + stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code)); + return -1; + } + + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeRestoreCheckpointInfo(&encoder, &req)) < 0) { + rpcFreeCont(buf); + stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(code)); + return -1; + } + tEncoderClear(&encoder); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_CONSEN, buf, tlen); + stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId, + pInfo->checkpointId); + + tmsgSendReq(&pTask->info.mnodeEpset, &msg); + return 0; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c new file mode 100644 index 0000000000..d6411e25f2 --- /dev/null +++ b/source/libs/stream/src/streamHb.c @@ -0,0 +1,342 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "streamInt.h" +#include "tmisce.h" +#include "tref.h" +#include "tstream.h" +#include "ttimer.h" +#include "wal.h" + +int32_t streamMetaId = 0; + +struct SMetaHbInfo { + tmr_h hbTmr; + int32_t stopFlag; + int32_t tickCounter; + int32_t hbCount; + int64_t hbStart; + int64_t msgSendTs; + SStreamHbMsg hbMsg; +}; + +static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { + if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter + pInfo->tickCounter = 0; + return true; + } + return false; +} + +static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { + int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes); + for (int k = 0; k < numOfExisted; ++k) { + if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) { + return true; + } + } + return false; +} + +static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { + SStreamMeta* pMeta = pTask->pMeta; + + taosThreadMutexLock(&pTask->lock); + + int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); + for (int j = 0; j < num; ++j) { + SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j); + + bool exist = existInHbMsg(pMsg, pTaskEpset); + if (!exist) { + taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId); + stDebug("vgId:%d nodeId:%d added into hbMsg update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, + (int32_t)taosArrayGetSize(pMsg->pUpdateNodes)); + } + } + + taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList); + taosThreadMutexUnlock(&pTask->lock); +} + +static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) { + int32_t code = 0; + int32_t tlen = 0; + + tEncodeSize(tEncodeStreamHbMsg, pMsg, tlen, code); + if (code < 0) { + stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + return TSDB_CODE_FAILED; + } + + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_FAILED; + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamHbMsg(&encoder, pMsg)) < 0) { + rpcFreeCont(buf); + stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + return TSDB_CODE_FAILED; + } + tEncoderClear(&encoder); + + stDebug("vgId:%d send hb to mnode, numOfTasks:%d msgId:%d", pMeta->vgId, pMsg->numOfTasks, pMsg->msgId); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); + tmsgSendReq(pEpset, &msg); + + return TSDB_CODE_SUCCESS; +} + +// NOTE: this task should be executed within the SStreamMeta lock region. +int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { + SEpSet epset = {0}; + bool hasMnodeEpset = false; + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + SMetaHbInfo* pInfo = pMeta->pHbInfo; + + // not recv the hb msg rsp yet, send current hb msg again + if (pInfo->msgSendTs > 0) { + stDebug("vgId:%d hbMsg rsp not recv, send current hbMsg, msgId:%d, total:%d again", pMeta->vgId, pInfo->hbMsg.msgId, + pInfo->hbCount); + + for(int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (pTask == NULL) { + continue; + } + + if ((*pTask)->info.fillHistory == 1) { + continue; + } + + epsetAssign(&epset, &(*pTask)->info.mnodeEpset); + break; + } + + pInfo->msgSendTs = taosGetTimestampMs(); + doSendHbMsgInfo(&pInfo->hbMsg, pMeta, &epset); + return TSDB_CODE_SUCCESS; + } + + SStreamHbMsg* pMsg = &pInfo->hbMsg; + stDebug("vgId:%d build stream hbMsg, leader:%d msgId:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER), + pMeta->pHbInfo->hbCount); + + pMsg->vgId = pMeta->vgId; + pMsg->msgId = pMeta->pHbInfo->hbCount; + + pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); + pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); + + if (pMsg->pTaskStatus == NULL || pMsg->pUpdateNodes == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (pTask == NULL) { + continue; + } + + // not report the status of fill-history task + if ((*pTask)->info.fillHistory == 1) { + continue; + } + + STaskStatusEntry entry = streamTaskGetStatusEntry(*pTask); + + entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); + if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { + entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; + entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); + } + + SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo; + if (p->activeId != 0) { + entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0; + entry.checkpointInfo.activeId = p->activeId; + entry.checkpointInfo.activeTransId = p->transId; + + if (entry.checkpointInfo.failed) { + stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d, clear the active checkpointInfo", + (*pTask)->id.idStr, p->transId); + + taosThreadMutexLock(&(*pTask)->lock); + streamTaskClearCheckInfo((*pTask), true); + taosThreadMutexUnlock(&(*pTask)->lock); + } + } + + if ((*pTask)->exec.pWalReader != NULL) { + entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1; + if (entry.processedVer < 0) { + entry.processedVer = (*pTask)->chkInfo.processedVer; + } + + walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); + } + + addUpdateNodeIntoHbMsg(*pTask, pMsg); + taosArrayPush(pMsg->pTaskStatus, &entry); + if (!hasMnodeEpset) { + epsetAssign(&epset, &(*pTask)->info.mnodeEpset); + hasMnodeEpset = true; + } + } + + pMsg->numOfTasks = taosArrayGetSize(pMsg->pTaskStatus); + + if (hasMnodeEpset) { + pInfo->msgSendTs = taosGetTimestampMs(); + doSendHbMsgInfo(pMsg, pMeta, &epset); + } else { + stDebug("vgId:%d no tasks or no mnd epset, not send stream hb to mnode", pMeta->vgId); + tCleanupStreamHbMsg(&pInfo->hbMsg); + pInfo->msgSendTs = -1; + } + + return TSDB_CODE_SUCCESS; +} + +void streamMetaHbToMnode(void* param, void* tmrId) { + int64_t rid = *(int64_t*)param; + + SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); + if (pMeta == NULL) { + stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid); + return; + } + + // need to stop, stop now + if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta + pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; + stDebug("vgId:%d jump out of meta timer", pMeta->vgId); + taosReleaseRef(streamMetaId, rid); + return; + } + + // not leader not send msg + if (pMeta->role != NODE_ROLE_LEADER) { + stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); + taosReleaseRef(streamMetaId, rid); + pMeta->pHbInfo->hbStart = 0; + return; + } + + // set the hb start time + if (pMeta->pHbInfo->hbStart == 0) { + pMeta->pHbInfo->hbStart = taosGetTimestampMs(); + } + + if (!waitForEnoughDuration(pMeta->pHbInfo)) { + taosTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); + taosReleaseRef(streamMetaId, rid); + return; + } + + streamMetaRLock(pMeta); + streamMetaSendHbHelper(pMeta); + streamMetaRUnLock(pMeta); + + taosTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); + taosReleaseRef(streamMetaId, rid); +} + +SMetaHbInfo* createMetaHbInfo(int64_t* pRid) { + SMetaHbInfo* pInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); + if (pInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return pInfo; + } + + pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); + pInfo->tickCounter = 0; + pInfo->stopFlag = 0; + pInfo->msgSendTs = -1; + pInfo->hbCount = 0; + return pInfo; +} + +void* destroyMetaHbInfo(SMetaHbInfo* pInfo) { + if (pInfo != NULL) { + tCleanupStreamHbMsg(&pInfo->hbMsg); + + if (pInfo->hbTmr != NULL) { + taosTmrStop(pInfo->hbTmr); + pInfo->hbTmr = NULL; + } + + taosMemoryFree(pInfo); + } + + return NULL; +} + +void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) { + // wait for the stream meta hb function stopping + if (pMeta->role == NODE_ROLE_LEADER) { + pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; + while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { + taosMsleep(100); + stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); + } + } +} + +void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount) { + *pStartTs = 0; + *pSendCount = 0; + + if (pInfo == NULL) { + return; + } + + *pStartTs = pInfo->hbStart; + *pSendCount = pInfo->hbCount; +} + +int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) { + stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId); + SMetaHbInfo* pInfo = pMeta->pHbInfo; + + streamMetaRLock(pMeta); + + // current waiting rsp recved + if (pRsp->msgId == pInfo->hbCount) { + tCleanupStreamHbMsg(&pInfo->hbMsg); + stDebug("vgId:%d hbMsg msgId:%d sendTs:%" PRId64 " recved confirmed", pMeta->vgId, pRsp->msgId, pInfo->msgSendTs); + + pInfo->hbCount += 1; + pInfo->msgSendTs = -1; + } else { + stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId); + } + + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a7f73d1b52..7b94f642e2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -27,10 +27,8 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; -int32_t streamMetaId = 0; int32_t taskDbWrapperId = 0; -static void metaHbToMnode(void* param, void* tmrId); static int32_t streamMetaBegin(SStreamMeta* pMeta); static void streamMetaCloseImpl(void* arg); @@ -39,14 +37,6 @@ typedef struct { SHashObj* pTable; } SMetaRefMgt; -struct SMetaHbInfo { - tmr_h hbTmr; - int32_t stopFlag; - int32_t tickCounter; - int32_t hbCount; - int64_t hbStart; -}; - typedef struct STaskInitTs { int64_t start; int64_t end; @@ -352,12 +342,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas goto _err; } - pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); - if (pMeta->pHbInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - // task list pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); if (pMeta->pTaskList == NULL) { @@ -400,9 +384,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); metaRefMgtAdd(pMeta->vgId, pRid); - pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); - pMeta->pHbInfo->tickCounter = 0; - pMeta->pHbInfo->stopFlag = 0; + pMeta->pHbInfo = createMetaHbInfo(pRid); + if (pMeta->pHbInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); pMeta->bkdChkptMgt = bkdMgtCreate(tpath); @@ -486,7 +473,7 @@ void streamMetaClear(SStreamMeta* pMeta) { } void streamMetaClose(SStreamMeta* pMeta) { - stDebug("start to close stream meta"); + stDebug("vgId:%d start to close stream meta", pMeta->vgId); if (pMeta == NULL) { return; } @@ -502,11 +489,13 @@ void streamMetaClose(SStreamMeta* pMeta) { void streamMetaCloseImpl(void* arg) { SStreamMeta* pMeta = arg; - stDebug("start to do-close stream meta"); if (pMeta == NULL) { return; } + int32_t vgId = pMeta->vgId; + stDebug("vgId:%d start to do-close stream meta", vgId); + streamMetaWLock(pMeta); streamMetaClear(pMeta); streamMetaWUnLock(pMeta); @@ -526,7 +515,8 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pFailedTaskSet); - taosMemoryFree(pMeta->pHbInfo); + pMeta->pHbInfo = destroyMetaHbInfo(pMeta->pHbInfo); + taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex); @@ -539,7 +529,7 @@ void streamMetaCloseImpl(void* arg) { taosThreadRwlockDestroy(&pMeta->lock); taosMemoryFree(pMeta); - stDebug("end to close stream meta"); + stDebug("vgId:%d end to close stream meta", vgId); } // todo let's check the status for each task @@ -901,13 +891,16 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } + stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64, + pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer); + // do duplicate task check. STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); if (code < 0) { - stError("failed to expand s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno)); + stError("failed to load s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); continue; } @@ -962,219 +955,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { (void)streamMetaCommit(pMeta); } -static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { - if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter - pInfo->tickCounter = 0; - return true; - } - return false; -} - -static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { - int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes); - for (int k = 0; k < numOfExisted; ++k) { - if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) { - return true; - } - } - return false; -} - -static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { - SStreamMeta* pMeta = pTask->pMeta; - - taosThreadMutexLock(&pTask->lock); - - int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); - for (int j = 0; j < num; ++j) { - SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j); - - bool exist = existInHbMsg(pMsg, pTaskEpset); - if (!exist) { - taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId); - stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, - (int32_t)taosArrayGetSize(pMsg->pUpdateNodes)); - } - } - - taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList); - taosThreadMutexUnlock(&pTask->lock); -} - -static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { - SStreamHbMsg hbMsg = {0}; - SEpSet epset = {0}; - bool hasMnodeEpset = false; - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - - hbMsg.vgId = pMeta->vgId; - hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); - hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (pTask == NULL) { - continue; - } - - // not report the status of fill-history task - if ((*pTask)->info.fillHistory == 1) { - continue; - } - - STaskStatusEntry entry = { - .id = id, - .status = streamTaskGetStatus(*pTask)->state, - .nodeId = hbMsg.vgId, - .stage = pMeta->stage, - - .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), - .startTime = (*pTask)->execInfo.readyTs, - .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, - .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, - .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, - .checkpointInfo.latestSize = 0, - .checkpointInfo.remoteBackup = 0, - .hTaskId = (*pTask)->hTaskInfo.id.taskId, - .procsTotal = SIZE_IN_MiB((*pTask)->execInfo.inputDataSize), - .outputTotal = SIZE_IN_MiB((*pTask)->execInfo.outputDataSize), - .procsThroughput = SIZE_IN_KiB((*pTask)->execInfo.procsThroughput), - .outputThroughput = SIZE_IN_KiB((*pTask)->execInfo.outputThroughput), - .startCheckpointId = (*pTask)->execInfo.startCheckpointId, - .startCheckpointVer = (*pTask)->execInfo.startCheckpointVer, - }; - - entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); - if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { - entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; - entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); - } - - SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo; - if (p->activeId != 0) { - entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0; - entry.checkpointInfo.activeId = p->activeId; - entry.checkpointInfo.activeTransId = p->transId; - - if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d, clear the active checkpointInfo", - (*pTask)->id.idStr, p->transId); - - taosThreadMutexLock(&(*pTask)->lock); - streamTaskClearCheckInfo((*pTask), true); - taosThreadMutexUnlock(&(*pTask)->lock); - } - } - - if ((*pTask)->exec.pWalReader != NULL) { - entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1; - if (entry.processedVer < 0) { - entry.processedVer = (*pTask)->chkInfo.processedVer; - } - - walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); - } - - addUpdateNodeIntoHbMsg(*pTask, &hbMsg); - taosArrayPush(hbMsg.pTaskStatus, &entry); - if (!hasMnodeEpset) { - epsetAssign(&epset, &(*pTask)->info.mnodeEpset); - hasMnodeEpset = true; - } - } - - hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); - - if (hasMnodeEpset) { - int32_t code = 0; - int32_t tlen = 0; - - tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); - if (code < 0) { - stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - goto _end; - } - - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - goto _end; - } - - SEncoder encoder; - tEncoderInit(&encoder, buf, tlen); - if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { - rpcFreeCont(buf); - stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - goto _end; - } - tEncoderClear(&encoder); - - SRpcMsg msg = {0}; - initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); - - pMeta->pHbInfo->hbCount += 1; - stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, - pMeta->pHbInfo->hbCount); - - tmsgSendReq(&epset, &msg); - } else { - stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); - } - -_end: - tCleanupStreamHbMsg(&hbMsg); - return TSDB_CODE_SUCCESS; -} - -void metaHbToMnode(void* param, void* tmrId) { - int64_t rid = *(int64_t*)param; - - SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); - if (pMeta == NULL) { - stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid); - return; - } - - // need to stop, stop now - if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta - pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; - stDebug("vgId:%d jump out of meta timer", pMeta->vgId); - taosReleaseRef(streamMetaId, rid); - return; - } - - // not leader not send msg - if (pMeta->role != NODE_ROLE_LEADER) { - stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); - taosReleaseRef(streamMetaId, rid); - pMeta->pHbInfo->hbStart = 0; - return; - } - - // set the hb start time - if (pMeta->pHbInfo->hbStart == 0) { - pMeta->pHbInfo->hbStart = taosGetTimestampMs(); - } - - if (!waitForEnoughDuration(pMeta->pHbInfo)) { - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); - taosReleaseRef(streamMetaId, rid); - return; - } - - stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); - streamMetaRLock(pMeta); - metaHeartbeatToMnodeImpl(pMeta); - streamMetaRUnLock(pMeta); - - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); - taosReleaseRef(streamMetaId, rid); -} - bool streamMetaTaskInTimer(SStreamMeta* pMeta) { bool inTimer = false; streamMetaRLock(pMeta); @@ -1200,14 +980,19 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; + int64_t startTs = 0; + int32_t sendCount = 0; + streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount); - stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", - vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); + stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", + vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); + + // wait for the stream meta hb function stopping + streamMetaWaitForHbTmrQuit(pMeta); streamMetaWLock(pMeta); pMeta->closeFlag = true; - void* pIter = NULL; while (1) { pIter = taosHashIterate(pMeta->pTasksMap, pIter); @@ -1222,15 +1007,6 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { streamMetaWUnLock(pMeta); - // wait for the stream meta hb function stopping - if (pMeta->role == NODE_ROLE_LEADER) { - pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; - while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { - taosMsleep(100); - stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); - } - } - stDebug("vgId:%d start to check all tasks for closing", vgId); int64_t st = taosGetTimestampMs(); @@ -1247,10 +1023,10 @@ void streamMetaStartHb(SStreamMeta* pMeta) { int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); metaRefMgtAdd(pMeta->vgId, pRid); *pRid = pMeta->rid; - metaHbToMnode(pRid, NULL); + streamMetaHbToMnode(pRid, NULL); } -void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { +void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { taosHashClear(pStartInfo->pReadyTaskSet); taosHashClear(pStartInfo->pFailedTaskSet); pStartInfo->tasksWillRestart = 0; @@ -1258,6 +1034,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { // reset the sentinel flag value to be 0 pStartInfo->startAllTasks = 0; + stDebug("vgId:%d clear all start-all-task info", vgId); } void streamMetaRLock(SStreamMeta* pMeta) { @@ -1336,7 +1113,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { streamMetaReleaseTask(pMeta, pTask); } - metaHeartbeatToMnodeImpl(pMeta); + streamMetaSendHbHelper(pMeta); pMeta->sendMsgBeforeClosing = false; return pTaskList; } @@ -1386,16 +1163,17 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64 return TSDB_CODE_SUCCESS; } -int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expandFn) { +// restore the checkpoint id by negotiating the latest consensus checkpoint id +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pMeta->vgId; int64_t now = taosGetTimestampMs(); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%" PRId64, vgId, numOfTasks, now); + stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%"PRId64, vgId, numOfTasks, now); if (numOfTasks == 0) { - stInfo("vgId:%d no tasks to be started", pMeta->vgId); + stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId); return TSDB_CODE_SUCCESS; } @@ -1406,11 +1184,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa return TSDB_CODE_SUCCESS; } - // broadcast the check downstream tasks msg + // broadcast the check downstream tasks msg only for tasks with related fill-history tasks. numOfTasks = taosArrayGetSize(pTaskList); // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without - // initialization , when the operation of check downstream tasks status is executed far quickly. + // initialization, when the operation of check downstream tasks status is executed far quickly. for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); @@ -1420,19 +1198,18 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa continue; } - if (pTask->pBackend == NULL) { // TODO: add test cases for this - code = expandFn(pTask); + if ((pTask->pBackend == NULL) && (pTask->info.fillHistory == 1 || HAS_RELATED_FILLHISTORY_TASK(pTask))) { + code = pMeta->expandTaskFn(pTask); if (code != TSDB_CODE_SUCCESS) { stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); } - } else { - stDebug("s-task:0x%x vgId:%d fill-history task backend has initialized already", pTaskId->taskId, vgId); } streamMetaReleaseTask(pMeta, pTask); } + // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here. for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); @@ -1465,16 +1242,28 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa continue; } - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); - if (ret != TSDB_CODE_SUCCESS) { - stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - code = ret; - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + if (ret != TSDB_CODE_SUCCESS) { + stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); + code = ret; + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } + + streamMetaReleaseTask(pMeta, pTask); + continue; } + // negotiate the consensus checkpoint id for current task + ASSERT(pTask->pBackend == NULL); + code = streamTaskSendRestoreChkptMsg(pTask); + + // this task may has no checkpoint, but others tasks may generate checkpoint already? streamMetaReleaseTask(pMeta, pTask); } + // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without + // initialization, when the operation of check downstream tasks status is executed far quickly. stInfo("vgId:%d start all task(s) completed", pMeta->vgId); taosArrayDestroy(pTaskList); return code; @@ -1535,7 +1324,7 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { return true; } -int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) { +int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t code = 0; int32_t vgId = pMeta->vgId; stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); @@ -1556,7 +1345,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas ASSERT(pTask->status.downstreamReady == 0); if (pTask->pBackend == NULL) { - code = expandFn(pTask); + code = pMeta->expandTaskFn(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } @@ -1638,7 +1427,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 // print the initialization elapsed time and info displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo); + streamMetaResetStartInfo(pStartInfo, pMeta->vgId); streamMetaWUnLock(pMeta); pStartInfo->completeFn(pMeta); diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streamMsg.c similarity index 93% rename from source/libs/stream/src/streammsg.c rename to source/libs/stream/src/streamMsg.c index ac67742ef5..e0435156e2 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "streammsg.h" +#include "streamMsg.h" #include "os.h" #include "tstream.h" @@ -363,6 +363,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI32(pEncoder, *pVgId) < 0) return -1; } + if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -422,6 +423,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { taosArrayPush(pReq->pUpdateNodes, &vgId); } + if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; tEndDecode(pDecoder); return 0; } @@ -432,12 +434,16 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { } if (pMsg->pUpdateNodes != NULL) { - taosArrayDestroy(pMsg->pUpdateNodes); + pMsg->pUpdateNodes = taosArrayDestroy(pMsg->pUpdateNodes); } if (pMsg->pTaskStatus != NULL) { - taosArrayDestroy(pMsg->pTaskStatus); + pMsg->pTaskStatus = taosArrayDestroy(pMsg->pTaskStatus); } + + pMsg->msgId = -1; + pMsg->vgId = -1; + pMsg->numOfTasks = -1; } int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { @@ -621,4 +627,46 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq if (tDecodeI8(pDecoder, &pReq->dropHTask) < 0) return -1; tEndDecode(pDecoder); return 0; +} + +int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo) { + if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pInfo->startTs) < 0) return -1; + if (tEncodeI64(pCoder, pInfo->streamId) < 0) return -1; + if (tEncodeI32(pCoder, pInfo->taskId) < 0) return -1; + if (tEncodeI64(pCoder, pInfo->checkpointId) < 0) return -1; + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo) { + if (tStartDecode(pCoder) < 0) return -1; + if (tDecodeI64(pCoder, &pInfo->startTs) < 0) return -1; + if (tDecodeI64(pCoder, &pInfo->streamId) < 0) return -1; + if (tDecodeI32(pCoder, &pInfo->taskId) < 0) return -1; + if (tDecodeI64(pCoder, &pInfo->checkpointId) < 0) return -1; + tEndDecode(pCoder); + return 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ff020e88c9..1decfe198a 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -273,6 +273,7 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x start to free task state", taskId); streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING); taskDbRemoveRef(pTask->pBackend); + pTask->pBackend = NULL; } if (pTask->pNameMap) { @@ -830,6 +831,34 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->hTaskId = pSrc->hTaskId; } +STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + + STaskStatusEntry entry = { + .id = streamTaskGetTaskId(pTask), + .status = streamTaskGetStatus(pTask)->state, + .nodeId = pMeta->vgId, + .stage = pMeta->stage, + + .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)), + .startTime = pExecInfo->readyTs, + .checkpointInfo.latestId = pTask->chkInfo.checkpointId, + .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer, + .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime, + .checkpointInfo.latestSize = 0, + .checkpointInfo.remoteBackup = 0, + .hTaskId = pTask->hTaskInfo.id.taskId, + .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize), + .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize), + .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput), + .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput), + .startCheckpointId = pExecInfo->startCheckpointId, + .startCheckpointVer = pExecInfo->startCheckpointVer, + }; + return entry; +} + static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { SStreamMeta* pMeta = pTask->pMeta;