From 1818edcb2b92f36f869f5781865ccffbda823ecc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Jun 2024 10:18:32 +0800 Subject: [PATCH] fix(stream): consensus the start checkpoint id, and extract the streamhb related functions into a new file. --- include/common/tmsg.h | 52 +-- include/common/tmsgdef.h | 3 +- include/dnode/vnode/tqCommon.h | 1 + .../libs/stream/{streammsg.h => streamMsg.h} | 27 ++ include/libs/stream/tstream.h | 10 +- source/common/src/tmsg.c | 46 +++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mnode/impl/inc/mndStream.h | 6 + source/dnode/mnode/impl/src/mndStream.c | 123 ++++++-- source/dnode/mnode/impl/src/mndStreamTrans.c | 2 - source/dnode/mnode/impl/src/mndStreamUtil.c | 78 +++++ source/dnode/snode/src/snode.c | 2 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 5 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 95 +++++- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 + source/libs/stream/inc/streamInt.h | 8 +- source/libs/stream/src/streamCheckpoint.c | 57 +++- source/libs/stream/src/streamMeta.c | 298 +++--------------- .../stream/src/{streammsg.c => streamMsg.c} | 41 ++- source/libs/stream/src/streamTask.c | 28 ++ source/libs/stream/src/streamTaskSm.c | 5 +- 24 files changed, 543 insertions(+), 353 deletions(-) rename include/libs/stream/{streammsg.h => streamMsg.h} (88%) rename source/libs/stream/src/{streammsg.c => streamMsg.c} (94%) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 698ab8fac3..03e50dc9eb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3485,12 +3485,6 @@ int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamR int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq); void tFreeMDropStreamReq(SMDropStreamReq* pReq); -typedef struct { - int64_t recoverObjUid; - int32_t taskId; - int32_t hasCheckPoint; -} SMVStreamGatherInfoReq; - typedef struct SVUpdateCheckpointInfoReq { SMsgHead head; int64_t streamId; @@ -3516,50 +3510,8 @@ typedef struct { int64_t suid; } SMqRebVgReq; -static FORCE_INLINE int tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) { - if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1; - if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1; - if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1; - if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1; - if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1; - if (tEncodeI8(pCoder, pReq->subType) < 0) return -1; - if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1; - - if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; - } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; - if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; - } - tEndEncode(pCoder); - return 0; -} - -static FORCE_INLINE int tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) { - if (tStartDecode(pCoder) < 0) return -1; - - if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1; - - if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1; - if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1; - if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1; - if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1; - - if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; - } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; - if (!tDecodeIsEnd(pCoder)) { - if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; - } - } - - tEndDecode(pCoder); - return 0; -} +int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq); +int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq); typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index c79e66f2e2..95eb3f1d91 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -249,6 +249,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_TSMA, "get-table-tsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_TSMA, "get-tsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_CONSEN, "stream-chkpt-consen", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) @@ -333,7 +334,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) //1035 1036 TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) //unused TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 4a8fc6260a..566e8dbbd8 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -29,6 +29,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streamMsg.h similarity index 88% rename from include/libs/stream/streammsg.h rename to include/libs/stream/streamMsg.h index 723c9fd099..b16932370d 100644 --- a/include/libs/stream/streammsg.h +++ b/include/libs/stream/streamMsg.h @@ -17,6 +17,7 @@ #define TDENGINE_STREAMMSG_H #include "tmsg.h" +#include "trpc.h" #ifdef __cplusplus extern "C" { @@ -204,6 +205,26 @@ typedef struct SCheckpointReport { int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq); int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq); +typedef struct SRestoreCheckpointInfo { + SMsgHead head; + int64_t streamId; + int64_t checkpointId; // latest checkpoint id + int32_t taskId; + int32_t nodeId; +} SRestoreCheckpointInfo; + +int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); +int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); + +typedef struct SRestoreCheckpointInfoRsp { + int64_t streamId; + int64_t checkpointId; + int32_t taskId; +} SRestoreCheckpointInfoRsp; + +int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo); +int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo); + typedef struct { SMsgHead head; int64_t streamId; @@ -211,6 +232,12 @@ typedef struct { int32_t reqType; } SStreamTaskRunReq; +typedef struct SCheckpointConsensusInfo { + SRestoreCheckpointInfo req; + SRpcMsg rsp; + int64_t ts; +} SCheckpointConsensusInfo; + #ifdef __cplusplus } #endif diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bf223e8c28..6acd7aa60a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -17,6 +17,7 @@ #define TDENGINE_TSTREAM_H #include "os.h" +#include "streamMsg.h" #include "streamState.h" #include "tdatablock.h" #include "tdbInt.h" @@ -24,7 +25,6 @@ #include "tmsgcb.h" #include "tqueue.h" #include "ttimer.h" -#include "streammsg.h" #ifdef __cplusplus extern "C" { @@ -704,6 +704,7 @@ void streamTaskSetRemoveBackendFiles(SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); +STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); @@ -750,10 +751,11 @@ void streamMetaResetStartInfo(STaskStartInfo* pMeta); SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); void streamMetaLoadAllTasks(SStreamMeta* pMeta); -int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn fn); +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); -int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn fn); +int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); +int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask); // timer tmr_h streamTimerGetInstance(); @@ -789,6 +791,8 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq); int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq *req); void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); +int32_t streamTaskSendLatestCheckpointInfo(SStreamTask* pTask); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b2a1aac62d..34f5e2c94a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9249,6 +9249,52 @@ int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) { } void tDeleteSTqCheckInfo(STqCheckInfo *pInfo) { taosArrayDestroy(pInfo->colIdList); } +int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) { + if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1; + if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1; + if (tEncodeI8(pCoder, pReq->subType) < 0) return -1; + if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1; + + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; + } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { + if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; + } + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1; + + if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1; + if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1; + + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; + } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { + if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; + } + } + + tEndDecode(pCoder); + return 0; +} + + int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { int32_t nUid = taosArrayGetSize(pRes->uidList); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index e137bcbdec..677e19d4c1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -242,6 +242,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 6a9f2f1275..7a0189b7c1 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -96,6 +96,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 6053c4db80..001696aecc 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -972,6 +972,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index dade7b4bdf..e358e0f6e5 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -34,6 +34,7 @@ extern "C" { #define MND_STREAM_TASK_RESET_NAME "stream-task-reset" #define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" #define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update" +#define MND_STREAM_CHKPT_CONSEN_NAME "stream-chkpt-consen" typedef struct SStreamTransInfo { int64_t startTime; @@ -61,6 +62,7 @@ typedef struct SStreamExecInfo { TdThreadMutex lock; SHashObj *pTransferStateStreams; SHashObj *pChkptStreams; + SHashObj *pStreamConsensus; } SStreamExecInfo; extern SStreamExecInfo execInfo; @@ -131,6 +133,8 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); +int32_t mndStreamSetRestoreCheckpointId(SArray* pList, int64_t checkpointId); + void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); @@ -141,6 +145,8 @@ void mndInitExecInfo(); void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +void mndAddConsensusTasks(SArray *pList, const SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg); +int64_t mndGetConsensusCheckpointId(SArray *pList, SStreamObj *pStream); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index af20617457..4be1682662 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -59,6 +59,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); +static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -117,6 +118,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_CONSEN, mndProcessConsensusCheckpointId); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); @@ -2514,7 +2516,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { return 0; } -static void doAddTaskInfo(SArray* pList, SCheckpointReport* pReport) { +static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pReport) { bool existed = false; for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { STaskChkptInfo* p = taosArrayGet(pList ,i); @@ -2586,12 +2588,12 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); - doAddTaskInfo(pList, &req); + doAddReportStreamTask(pList, &req); taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); } else { - doAddTaskInfo(*pReqTaskList, &req); + doAddReportStreamTask(*pReqTaskList, &req); } int32_t total = taosArrayGetSize(*pReqTaskList); @@ -2599,25 +2601,6 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 " will be issued soon", req.streamId, pStream->name, total, req.checkpointId); - - // if (pStream != NULL) { - // bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); - // if (conflict) { - // mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); - // } else { - // int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); - // if (code == TSDB_CODE_SUCCESS) { // remove this entry - // taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); - // - // int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); - // mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, - // numOfStreams); - // } else { - // mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", - // req.streamId); - // } - // } - // } } if (pStream != NULL) { @@ -2639,6 +2622,102 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { return 0; } +static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SDecoder decoder = {0}; + + SRestoreCheckpointInfo req = {0}; + tDecoderInit(&decoder, pReq->pCont, pReq->contLen); + + if (tDecodeStreamTaskLatestChkptInfo(&decoder, &req)) { + tDecoderClear(&decoder); + terrno = TSDB_CODE_INVALID_MSG; + mError("invalid task consensus-checkpoint msg received"); + return -1; + } + tDecoderClear(&decoder); + + mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64, + req.nodeId, req.streamId, req.taskId, req.checkpointId); + + // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. + taosThreadMutexLock(&execInfo.lock); + + SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + if (pStream == NULL) { + mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); + + // not in meta-store yet, try to acquire the task in exec buffer + // the checkpoint req arrives too soon before the completion of the create stream trans. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } + } + + int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); + + SArray **pTaskList = (SArray **)taosHashGet(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId)); + if (pTaskList == NULL) { + SArray *pList = taosArrayInit(4, sizeof(SCheckpointConsensusInfo)); + mndAddConsensusTasks(pList, &req, pReq); + taosHashPut(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); + + pTaskList = (SArray **)taosHashGet(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId)); + } else { + mndAddConsensusTasks(*pTaskList, &req, pReq); + } + + int32_t total = taosArrayGetSize(*pTaskList); + if (total == numOfTasks) { // all tasks has send the reqs + // start transaction to set the checkpoint id + int64_t checkpointId = mndGetConsensusCheckpointId(*pTaskList, pStream); + mInfo("stream:0x%" PRIx64 " %s all %d tasks send latest checkpointId, the consensus-checkpointId is:%" PRId64 + " will be issued soon", + req.streamId, pStream->name, total, checkpointId); + + // start the checkpoint consensus trans + int32_t code = mndStreamSetRestoreCheckpointId(*pTaskList, checkpointId); + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry + taosHashRemove(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId)); + int32_t numOfStreams = taosHashGetSize(execInfo.pStreamConsensus); + mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", req.streamId, numOfStreams); + } else { + mDebug("stream:0x%" PRIx64 " not start set consensus-checkpointId trans, due to not all task ready", + req.streamId); + } + } else { + mDebug("stream:0x%" PRIx64 " %d/%d tasks send consensus-checkpointId info", req.streamId, total, numOfTasks); + } + + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + + taosThreadMutexUnlock(&execInfo.lock); + + pReq->info.handle = NULL; // disable auto rsp + +// { // start an transaction to set the start checkpoint id +// SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SRestoreCheckpointInfoRsp)}; +// rsp.pCont = rpcMallocCont(rsp.contLen); +// SMsgHead *pHead = rsp.pCont; +// pHead->vgId = htonl(req.nodeId); +// +// tmsgSendRsp(&rsp); +// pReq->info.handle = NULL; // disable auto rsp +// } + + return 0; +} + static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { int32_t code = mndProcessCreateStreamReq(pReq); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 0daa383d3e..ce00267fa6 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -300,5 +300,3 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { mDebug("complete clear checkpoints in Dbs"); } - - diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index e47f28c309..4113bbe2b0 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -575,10 +575,12 @@ void mndInitExecInfo() { execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList); + taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList); } void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { @@ -817,4 +819,80 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { taosArrayDestroy(pDropped); return TSDB_CODE_SUCCESS; +} + +static int32_t doSendRestoreCheckpointInfo(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) { + int32_t code = 0; + int32_t blen; + + SRestoreCheckpointInfoRsp req = {.streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId}; + + tEncodeSize(tEncodeRestoreCheckpointInfoRsp, &req, blen, code); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int32_t tlen = sizeof(SMsgHead) + blen; + void *abuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + tEncodeRestoreCheckpointInfoRsp(&encoder, &req); + + SMsgHead *pMsgHead = (SMsgHead *)pMsg->pCont; + pMsgHead->contLen = htonl(tlen); + pMsgHead->vgId = htonl(pInfo->nodeId); + tEncoderClear(&encoder); + + tmsgSendRsp(pMsg); + return code; +} + +int32_t mndStreamSetRestoreCheckpointId(SArray* pInfoList, int64_t checkpointId) { + for(int32_t i = 0; i < taosArrayGetSize(pInfoList); ++i) { + SCheckpointConsensusInfo* pInfo = taosArrayGet(pInfoList, i); + doSendRestoreCheckpointInfo(&pInfo->req, &pInfo->rsp, checkpointId); + } + return 0; +} + +void mndAddConsensusTasks(SArray* pList, const SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg) { + bool existed = false; + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + STaskChkptInfo* p = taosArrayGet(pList ,i); + if (p->taskId == pInfo->taskId) { + existed = true; + break; + } + } + + if (!existed) { + SCheckpointConsensusInfo info = {0}; + memcpy(&info.req, pInfo, sizeof(info.req)); + + info.rsp.code = 0; + info.rsp.info = pMsg->info; + info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); + info.rsp.pCont = rpcMallocCont(info.rsp.contLen); + + SMsgHead *pHead = info.rsp.pCont; + pHead->vgId = htonl(pInfo->nodeId); + + taosArrayPush(pList, &info); + } +} + +int64_t mndGetConsensusCheckpointId(SArray* pList, SStreamObj* pStream) { + int64_t checkpointId = INT64_MAX; + + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + SCheckpointConsensusInfo *pInfo = taosArrayGet(pList, i); + if (pInfo->req.checkpointId < checkpointId) { + checkpointId = pInfo->req.checkpointId; + mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name, + pInfo->req.taskId, pInfo->req.nodeId, pInfo->req.checkpointId); + } + } + + return checkpointId; } \ No newline at end of file diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 0eb0db002f..76660354ea 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -155,6 +155,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); case TDMT_STREAM_TASK_UPDATE_CHKPT: return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen); + case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: + return tqStreamProcessConsensusChkptRsp(pSnode->pMeta, pMsg); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 22b26498e4..1bec226489 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -262,6 +262,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 07d77905cb..d5ae4ecb96 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1029,7 +1029,6 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { - // return 0; } @@ -1273,3 +1272,7 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg); } + +int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessConsensusChkptRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 3ea7ad0718..a47dbf1f3f 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -121,7 +121,6 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); @@ -132,6 +131,26 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK); } +int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + if (numOfTasks == 0) { + tqDebug("vgId:%d no stream tasks existed to run", vgId); + return 0; + } + + tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId); + if (pTask == NULL) { + tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + int32_t code = streamTaskSendConsensusChkptMsg(pTask); + streamMetaReleaseTask(pMeta, pTask); + return code; +} + int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -706,7 +725,8 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { if (isLeader && !tsDisableStream) { streamMetaWUnLock(pMeta); - streamMetaStartAllTasks(pMeta, tqExpandStreamTask); + ASSERT(0); + streamMetaStartAllTasks(pMeta); } else { streamMetaResetStartInfo(&pMeta->startInfo); streamMetaWUnLock(pMeta); @@ -724,10 +744,10 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t vgId = pMeta->vgId; if (type == STREAM_EXEC_T_START_ONE_TASK) { - streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId, tqExpandStreamTask); + streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); return 0; } else if (type == STREAM_EXEC_T_START_ALL_TASKS) { - streamMetaStartAllTasks(pMeta, tqExpandStreamTask); + streamMetaStartAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) { restartStreamTasks(pMeta, isLeader); @@ -854,7 +874,8 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { } else if (pState->state == TASK_STATUS__UNINIT) { tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); ASSERT(pTask->status.downstreamReady == 0); - tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); + tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); +// tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); } else { tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } @@ -1076,6 +1097,70 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { } streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; +} + +int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + bool updateCheckpointId = false; + + SRestoreCheckpointInfoRsp req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + + rsp.info.handle = NULL; + if (tDecodeRestoreCheckpointInfoRsp(&decoder, &req) < 0) { + // rsp.code = TSDB_CODE_MSG_DECODE_ERROR; // disable it temporarily + tqError("vgId:%d failed to decode restore task checkpointId, code:%s", vgId, tstrerror(rsp.code)); + tDecoderClear(&decoder); + return TSDB_CODE_SUCCESS; + } + + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); + if (pTask == NULL) { + tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, + req.taskId); + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode", + pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId); + + taosThreadMutexLock(&pTask->lock); + ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); + + if ((pTask->chkInfo.checkpointId != req.checkpointId) && req.checkpointId != 0) { + tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId, + pTask->chkInfo.checkpointId, req.checkpointId); + pTask->chkInfo.checkpointId = req.checkpointId; + updateCheckpointId = true; + streamMetaSaveTask(pMeta, pTask); + } + + taosThreadMutexUnlock(&pTask->lock); + + if (updateCheckpointId) { + streamMetaWLock(pMeta); + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + streamMetaWUnLock(pMeta); + } + + // todo: set the update transId, and discard with less transId. + if (pMeta->role == NODE_ROLE_LEADER) { + /*code = */tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId); + } else { + tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr); + } + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f9bb636be3..1635e12176 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -647,6 +647,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg tqProcessTaskResetReq(pVnode->pTq, pMsg); } } break; + case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: { + if (pVnode->restored) { + tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg); + } + } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 08d0a5e486..2ee1839925 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -157,6 +157,7 @@ extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; extern int32_t taskDbWrapperId; +extern int32_t streamMetaId; int32_t streamTimerInit(); void streamTimerCleanUp(); @@ -211,7 +212,12 @@ void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); -void streamMetaRemoveDB(void* arg, char* key); +void streamMetaRemoveDB(void* arg, char* key); +void streamMetaHbToMnode(void* param, void* tmrId); +SMetaHbInfo* createMetaHbInfo(int64_t* pRid); +void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta); +void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount); +int32_t streamMetaSendHbHelper(SStreamMeta* pMeta); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ec8510d9f0..118132e801 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -443,13 +443,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV SStreamTaskState* pStatus = streamTaskGetStatus(pTask); -// if (restored && (pStatus->state != TASK_STATUS__CK)) { -// stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 -// " failed", -// pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); -// taosThreadMutexUnlock(&pTask->lock); -// return TSDB_CODE_STREAM_TASK_IVLD_STATUS; -// } + if (restored && (pStatus->state != TASK_STATUS__CK)) { + stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " failed", + pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } if (!restored) { // during restore procedure, do update checkpoint-info stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 @@ -1103,3 +1103,46 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { s3DeleteObjects((const char**)&tmp, 1); return 0; } + +int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) { + int32_t code; + int32_t tlen = 0; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SCheckpointInfo* pInfo = &pTask->chkInfo; + + ASSERT(pTask->pBackend == NULL); + + SRestoreCheckpointInfo req = { + .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId, .checkpointId = pInfo->checkpointId}; + + tEncodeSize(tEncodeStreamTaskLatestChkptInfo, &req, tlen, code); + if (code < 0) { + stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code)); + return -1; + } + + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamTaskLatestChkptInfo(&encoder, &req)) < 0) { + rpcFreeCont(buf); + stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(code)); + return -1; + } + tEncoderClear(&encoder); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_CONSEN, buf, tlen); + stDebug("s-task:%s vgId:%d send task latest-checkpoint-id to mnode:%" PRId64 " to reach the consensus checkpointId", + id, vgId, pInfo->checkpointId); + + tmsgSendReq(&pTask->info.mnodeEpset, &msg); + return 0; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 03c7b93f91..69124a0e37 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -27,10 +27,8 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; -int32_t streamMetaId = 0; int32_t taskDbWrapperId = 0; -static void metaHbToMnode(void* param, void* tmrId); static int32_t streamMetaBegin(SStreamMeta* pMeta); static void streamMetaCloseImpl(void* arg); @@ -39,14 +37,6 @@ typedef struct { SHashObj* pTable; } SMetaRefMgt; -struct SMetaHbInfo { - tmr_h hbTmr; - int32_t stopFlag; - int32_t tickCounter; - int32_t hbCount; - int64_t hbStart; -}; - typedef struct STaskInitTs { int64_t start; int64_t end; @@ -352,12 +342,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas goto _err; } - pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); - if (pMeta->pHbInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - // task list pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); if (pMeta->pTaskList == NULL) { @@ -400,9 +384,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); metaRefMgtAdd(pMeta->vgId, pRid); - pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); - pMeta->pHbInfo->tickCounter = 0; - pMeta->pHbInfo->stopFlag = 0; + pMeta->pHbInfo = createMetaHbInfo(pRid); + if (pMeta->pHbInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); pMeta->bkdChkptMgt = bkdMgtCreate(tpath); @@ -960,213 +947,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { taosArrayDestroy(pRecycleList); } -static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { - if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter - pInfo->tickCounter = 0; - return true; - } - return false; -} - -static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { - int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes); - for (int k = 0; k < numOfExisted; ++k) { - if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) { - return true; - } - } - return false; -} - -static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { - SStreamMeta* pMeta = pTask->pMeta; - - taosThreadMutexLock(&pTask->lock); - - int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); - for (int j = 0; j < num; ++j) { - SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j); - - bool exist = existInHbMsg(pMsg, pTaskEpset); - if (!exist) { - taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId); - stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, - (int32_t)taosArrayGetSize(pMsg->pUpdateNodes)); - } - } - - taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList); - taosThreadMutexUnlock(&pTask->lock); -} - -static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { - SStreamHbMsg hbMsg = {0}; - SEpSet epset = {0}; - bool hasMnodeEpset = false; - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - - hbMsg.vgId = pMeta->vgId; - hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); - hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (pTask == NULL) { - continue; - } - - // not report the status of fill-history task - if ((*pTask)->info.fillHistory == 1) { - continue; - } - - STaskStatusEntry entry = { - .id = id, - .status = streamTaskGetStatus(*pTask)->state, - .nodeId = hbMsg.vgId, - .stage = pMeta->stage, - - .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), - .startTime = (*pTask)->execInfo.readyTs, - .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, - .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, - .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, - .checkpointInfo.latestSize = 0, - .checkpointInfo.remoteBackup = 0, - .hTaskId = (*pTask)->hTaskInfo.id.taskId, - .procsTotal = SIZE_IN_MiB((*pTask)->execInfo.inputDataSize), - .outputTotal = SIZE_IN_MiB((*pTask)->execInfo.outputDataSize), - .procsThroughput = SIZE_IN_KiB((*pTask)->execInfo.procsThroughput), - .outputThroughput = SIZE_IN_KiB((*pTask)->execInfo.outputThroughput), - .startCheckpointId = (*pTask)->execInfo.startCheckpointId, - .startCheckpointVer = (*pTask)->execInfo.startCheckpointVer, - }; - - entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); - if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { - entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; - entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); - } - - if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { - entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; - entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; - entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; - - if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId); - } - } - - if ((*pTask)->exec.pWalReader != NULL) { - entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1; - if (entry.processedVer < 0) { - entry.processedVer = (*pTask)->chkInfo.processedVer; - } - - walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); - } - - addUpdateNodeIntoHbMsg(*pTask, &hbMsg); - taosArrayPush(hbMsg.pTaskStatus, &entry); - if (!hasMnodeEpset) { - epsetAssign(&epset, &(*pTask)->info.mnodeEpset); - hasMnodeEpset = true; - } - } - - hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); - - if (hasMnodeEpset) { - int32_t code = 0; - int32_t tlen = 0; - - tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); - if (code < 0) { - stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - goto _end; - } - - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - goto _end; - } - - SEncoder encoder; - tEncoderInit(&encoder, buf, tlen); - if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { - rpcFreeCont(buf); - stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - goto _end; - } - tEncoderClear(&encoder); - - SRpcMsg msg = {0}; - initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); - - pMeta->pHbInfo->hbCount += 1; - stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, - pMeta->pHbInfo->hbCount); - - tmsgSendReq(&epset, &msg); - } else { - stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); - } - -_end: - tCleanupStreamHbMsg(&hbMsg); - return TSDB_CODE_SUCCESS; -} - -void metaHbToMnode(void* param, void* tmrId) { - int64_t rid = *(int64_t*)param; - - SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); - if (pMeta == NULL) { - stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid); - return; - } - - // need to stop, stop now - if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta - pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; - stDebug("vgId:%d jump out of meta timer", pMeta->vgId); - taosReleaseRef(streamMetaId, rid); - return; - } - - // not leader not send msg - if (pMeta->role != NODE_ROLE_LEADER) { - stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); - taosReleaseRef(streamMetaId, rid); - pMeta->pHbInfo->hbStart = 0; - return; - } - - // set the hb start time - if (pMeta->pHbInfo->hbStart == 0) { - pMeta->pHbInfo->hbStart = taosGetTimestampMs(); - } - - if (!waitForEnoughDuration(pMeta->pHbInfo)) { - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); - taosReleaseRef(streamMetaId, rid); - return; - } - - stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); - streamMetaRLock(pMeta); - metaHeartbeatToMnodeImpl(pMeta); - streamMetaRUnLock(pMeta); - - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); - taosReleaseRef(streamMetaId, rid); -} - bool streamMetaTaskInTimer(SStreamMeta* pMeta) { bool inTimer = false; streamMetaRLock(pMeta); @@ -1192,14 +972,19 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; + int64_t startTs = 0; + int32_t sendCount = 0; + streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount); stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", - vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); + vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); + + // wait for the stream meta hb function stopping + streamMetaWaitForHbTmrQuit(pMeta); streamMetaWLock(pMeta); pMeta->closeFlag = true; - void* pIter = NULL; while (1) { pIter = taosHashIterate(pMeta->pTasksMap, pIter); @@ -1214,15 +999,6 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { streamMetaWUnLock(pMeta); - // wait for the stream meta hb function stopping - if (pMeta->role == NODE_ROLE_LEADER) { - pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; - while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { - taosMsleep(100); - stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); - } - } - stDebug("vgId:%d start to check all tasks for closing", vgId); int64_t st = taosGetTimestampMs(); @@ -1239,7 +1015,7 @@ void streamMetaStartHb(SStreamMeta* pMeta) { int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); metaRefMgtAdd(pMeta->vgId, pRid); *pRid = pMeta->rid; - metaHbToMnode(pRid, NULL); + streamMetaHbToMnode(pRid, NULL); } void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { @@ -1328,7 +1104,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { streamMetaReleaseTask(pMeta, pTask); } - metaHeartbeatToMnodeImpl(pMeta); + streamMetaSendHbHelper(pMeta); pMeta->sendMsgBeforeClosing = false; return pTaskList; } @@ -1378,16 +1154,17 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64 return TSDB_CODE_SUCCESS; } -int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expandFn) { +// restore the checkpoint id by negotiating the latest consensus checkpoint id +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pMeta->vgId; int64_t now = taosGetTimestampMs(); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%"PRId64, vgId, numOfTasks, now); + stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%"PRId64, vgId, numOfTasks, now); if (numOfTasks == 0) { - stInfo("vgId:%d no tasks to be started", pMeta->vgId); + stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId); return TSDB_CODE_SUCCESS; } @@ -1398,11 +1175,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa return TSDB_CODE_SUCCESS; } - // broadcast the check downstream tasks msg + // broadcast the check downstream tasks msg only for tasks with related fill-history tasks. numOfTasks = taosArrayGetSize(pTaskList); // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without - // initialization , when the operation of check downstream tasks status is executed far quickly. + // initialization, when the operation of check downstream tasks status is executed far quickly. for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); @@ -1412,19 +1189,18 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa continue; } - if (pTask->pBackend == NULL) { // TODO: add test cases for this - code = expandFn(pTask); + if ((pTask->pBackend == NULL) && (pTask->info.fillHistory == 1 || HAS_RELATED_FILLHISTORY_TASK(pTask))) { + code = pMeta->expandTaskFn(pTask); if (code != TSDB_CODE_SUCCESS) { stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); } - } else { - stDebug("s-task:0x%x vgId:%d fill-history task backend has initialized already", pTaskId->taskId, vgId); } streamMetaReleaseTask(pMeta, pTask); } + // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here. for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); @@ -1457,16 +1233,28 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa continue; } - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); - if (ret != TSDB_CODE_SUCCESS) { - stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - code = ret; - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + if (ret != TSDB_CODE_SUCCESS) { + stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); + code = ret; + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } + + streamMetaReleaseTask(pMeta, pTask); + continue; } + // negotiate the consensus checkpoint id for current task + ASSERT(pTask->pBackend == NULL); + code = streamTaskSendConsensusChkptMsg(pTask); + + // this task may has no checkpoint, but others tasks may generate checkpoint already? streamMetaReleaseTask(pMeta, pTask); } + // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without + // initialization, when the operation of check downstream tasks status is executed far quickly. stInfo("vgId:%d start all task(s) completed", pMeta->vgId); taosArrayDestroy(pTaskList); return code; @@ -1527,7 +1315,7 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { return true; } -int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) { +int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t code = 0; int32_t vgId = pMeta->vgId; stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); @@ -1548,7 +1336,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas ASSERT(pTask->status.downstreamReady == 0); if (pTask->pBackend == NULL) { - code = expandFn(pTask); + code = pMeta->expandTaskFn(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streamMsg.c similarity index 94% rename from source/libs/stream/src/streammsg.c rename to source/libs/stream/src/streamMsg.c index a6ab6a60c2..13a02e5637 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "streamMsg.h" #include "os.h" -#include "streammsg.h" #include "tstream.h" int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) { @@ -77,7 +77,6 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp return pEncoder->pos; } - int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; @@ -624,4 +623,42 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq if (tDecodeI8(pDecoder, &pReq->dropHTask) < 0) return -1; tEndDecode(pDecoder); return 0; +} + +int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo) { + if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pInfo->streamId) < 0) return -1; + if (tEncodeI32(pCoder, pInfo->taskId) < 0) return -1; + if (tEncodeI64(pCoder, pInfo->checkpointId) < 0) return -1; + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo) { + if (tStartDecode(pCoder) < 0) return -1; + if (tDecodeI64(pCoder, &pInfo->streamId) < 0) return -1; + if (tDecodeI32(pCoder, &pInfo->taskId) < 0) return -1; + if (tDecodeI64(pCoder, &pInfo->checkpointId) < 0) return -1; + tEndDecode(pCoder); + return 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 70e3790209..e968522854 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -832,6 +832,34 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->hTaskId = pSrc->hTaskId; } +STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + + STaskStatusEntry entry = { + .id = streamTaskGetTaskId(pTask), + .status = streamTaskGetStatus(pTask)->state, + .nodeId = pMeta->vgId, + .stage = pMeta->stage, + + .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)), + .startTime = pExecInfo->readyTs, + .checkpointInfo.latestId = pTask->chkInfo.checkpointId, + .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer, + .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime, + .checkpointInfo.latestSize = 0, + .checkpointInfo.remoteBackup = 0, + .hTaskId = pTask->hTaskInfo.id.taskId, + .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize), + .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize), + .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput), + .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput), + .startCheckpointId = pExecInfo->startCheckpointId, + .startCheckpointVer = pExecInfo->startCheckpointVer, + }; + return entry; +} + static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { SStreamMeta* pMeta = pTask->pMeta; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 75d62ff324..e543f2f1d4 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -199,9 +199,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) { SStreamTaskSM* pSM = pTask->status.pSM; - bool removed = false; - taosThreadMutexLock(&pTask->lock); - + bool removed = false; int32_t num = taosArrayGetSize(pSM->pWaitingEventList); for (int32_t i = 0; i < num; ++i) { SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i); @@ -218,7 +216,6 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name); } - taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_SUCCESS; }