From cbf8b363fc3317f4c638531a220785b6da3857e2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 10 Jun 2024 00:46:01 +0800 Subject: [PATCH] fix(stream): update checkpoint info by using trans. --- include/common/tmsg.h | 4 - include/common/tmsgdef.h | 5 +- include/dnode/vnode/tqCommon.h | 1 + include/libs/stream/streammsg.h | 14 ++ include/libs/stream/tstream.h | 3 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 + source/dnode/mgmt/mgmt_snode/src/smHandle.c | 9 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mnode/impl/inc/mndStream.h | 34 +++- source/dnode/mnode/impl/src/mndMain.c | 2 - source/dnode/mnode/impl/src/mndStream.c | 201 +++++++++++++++++++- source/dnode/mnode/impl/src/mndStreamHb.c | 19 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 112 +++++++++++ source/dnode/snode/src/snode.c | 2 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 4 + source/dnode/vnode/src/tqCommon/tqCommon.c | 2 + source/dnode/vnode/src/vnd/vnodeSvr.c | 2 + source/libs/stream/src/streamCheckpoint.c | 22 ++- source/libs/stream/src/streamTask.c | 65 ++++--- source/libs/stream/src/streammsg.c | 28 +++ 21 files changed, 465 insertions(+), 68 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fa294c3fc5..98ca414466 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3627,10 +3627,6 @@ typedef struct { int32_t taskId; } SVPauseStreamTaskReq, SVResetStreamTaskReq; -typedef struct { - int8_t reserved; -} SVPauseStreamTaskRsp; - typedef struct { char name[TSDB_STREAM_FNAME_LEN]; int8_t igNotExists; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index d1ac2c79c3..db5ae21692 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -207,9 +207,9 @@ TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) // unused,reserved TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, "stream-checkpoint-remain", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL) @@ -370,6 +370,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 0076d79312..0fb690e756 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -28,6 +28,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streammsg.h index 91bfc6afc8..723c9fd099 100644 --- a/include/libs/stream/streammsg.h +++ b/include/libs/stream/streammsg.h @@ -190,6 +190,20 @@ typedef struct SCheckpointTriggerRsp { int32_t rspCode; } SCheckpointTriggerRsp; +typedef struct SCheckpointReport { + int64_t streamId; + int32_t taskId; + int32_t nodeId; + int64_t checkpointId; + int64_t checkpointVer; + int64_t checkpointTs; + int32_t transId; + int8_t dropHTask; +} SCheckpointReport; + +int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq); +int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq); + typedef struct { SMsgHead head; int64_t streamId; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d07a302920..1996c2ed63 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -762,8 +762,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t setCode); -int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId, - SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); +int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq); SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo(); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index e4afba6722..3d5da860ad 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -232,6 +232,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -240,6 +241,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 9b07b6a3d8..856d7b2051 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -75,26 +75,27 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index bfc9e92293..41b9352f18 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -967,6 +967,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToWriteQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index c8a967620c..6ca61265bb 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -26,13 +26,14 @@ extern "C" { #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_VER_NUMBER 5 -#define MND_STREAM_CREATE_NAME "stream-create" -#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" -#define MND_STREAM_PAUSE_NAME "stream-pause" -#define MND_STREAM_RESUME_NAME "stream-resume" -#define MND_STREAM_DROP_NAME "stream-drop" -#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" -#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" +#define MND_STREAM_CREATE_NAME "stream-create" +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" +#define MND_STREAM_PAUSE_NAME "stream-pause" +#define MND_STREAM_RESUME_NAME "stream-resume" +#define MND_STREAM_DROP_NAME "stream-drop" +#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" +#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" +#define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update" typedef struct SStreamTransInfo { int64_t startTime; @@ -51,6 +52,7 @@ typedef struct SStreamTransMgmt { } SStreamTransMgmt; typedef struct SStreamExecInfo { + bool initTaskList; SArray *pNodeList; int64_t ts; // snapshot ts SStreamTransMgmt transMgmt; @@ -58,6 +60,7 @@ typedef struct SStreamExecInfo { SArray *pTaskList; TdThreadMutex lock; SHashObj *pTransferStateStreams; + SHashObj *pChkptStreams; } SStreamExecInfo; extern SStreamExecInfo execInfo; @@ -78,7 +81,18 @@ typedef struct SOrphanTask { typedef struct { SMsgHead head; -} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg; +} SMStreamHbRspMsg, SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp; + +typedef struct STaskChkptInfo { + int32_t nodeId; + int32_t taskId; + int64_t streamId; + int64_t checkpointId; + int64_t version; + int64_t ts; + int32_t transId; + int8_t dropHTask; +}STaskChkptInfo; int32_t mndInitStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode); @@ -114,6 +128,10 @@ int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *p int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); +int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); +void scanCheckpointReportInfo(SMnode *pMnode); +void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); void destroyStreamTaskIter(SStreamTaskIter *pIter); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 9252843d2f..cad8c6d745 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -680,8 +680,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } - mndInitStreamExecInfo(pMnode, &execInfo); - mInfo("mnode open successfully"); return pMnode; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1f1eaa999e..7ee34d502a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -58,10 +58,10 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); +static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); @@ -104,6 +104,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp); // for msgs inside mnode // TODO change the name @@ -114,6 +115,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); @@ -1091,7 +1093,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { } static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { - bool ready = true; + bool ready = true; if (taskNodeIsUpdated(pMnode)) { return -1; } @@ -1102,6 +1104,8 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0); } + SArray* pInvalidList = taosArrayInit(4, sizeof(STaskId)); + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId *p = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); @@ -1109,11 +1113,32 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { continue; } + // for stopped stream task entry, we do additional check for it + if (pEntry->status == TASK_STATUS__STOP) { + bool invalid = false; + for(int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { + STaskId* pId = taosArrayGet(pInvalidList, j); + if (pEntry->id.streamId == pId->streamId) { + taosArrayPush(pInvalidList, &pEntry->id); + invalid = true; + break; + } + } + + if (!invalid) { + SStreamObj *pObj = mndGetStreamObj(pMnode, pEntry->id.streamId); + if (pObj == NULL) { + taosArrayPush(pInvalidList, &pEntry->id); + } else { + mndReleaseStream(pMnode, pObj); + } + } + } + if (pEntry->status != TASK_STATUS__READY) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); ready = false; - break; } if (pEntry->hTaskId != 0) { @@ -1126,6 +1151,8 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { } } + removeTasksInBuf(pInvalidList, &execInfo); + taosThreadMutexUnlock(&execInfo.lock); return ready ? 0 : -1; } @@ -1801,6 +1828,10 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock int32_t numOfRows = 0; SStreamObj *pStream = NULL; + taosThreadMutexLock(&execInfo.lock); + mndInitStreamExecInfo(pMnode, &execInfo); + taosThreadMutexUnlock(&execInfo.lock); + while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); if (pShow->pIter == NULL) { @@ -2437,13 +2468,136 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); { - SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)}; + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)}; rsp.pCont = rpcMallocCont(rsp.contLen); SMsgHead *pHead = rsp.pCont; pHead->vgId = htonl(req.nodeId); tmsgSendRsp(&rsp); + pReq->info.handle = NULL; // disable auto rsp + } + return 0; +} + +static void doAddTaskInfo(SArray* pList, SCheckpointReport* pReport) { + bool existed = false; + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + STaskChkptInfo* p = taosArrayGet(pList ,i); + if (p->taskId == pReport->taskId) { + existed = true; + break; + } + } + + if (!existed) { + STaskChkptInfo info = { + .streamId = pReport->streamId, + .taskId = pReport->taskId, + .transId = pReport->transId, + .dropHTask = pReport->dropHTask, + .version = pReport->checkpointVer, + .ts = pReport->checkpointTs, + .checkpointId = pReport->checkpointId, + .nodeId = pReport->nodeId, + }; + taosArrayPush(pList, &info); + } +} + +int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SCheckpointReport req = {0}; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, pReq->pCont, pReq->contLen); + + if (tDecodeStreamTaskChkptReport(&decoder, &req)) { + tDecoderClear(&decoder); + terrno = TSDB_CODE_INVALID_MSG; + mError("invalid task checkpoint-report msg received"); + return -1; + } + tDecoderClear(&decoder); + + mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64 + " checkpointVer:%" PRId64 " transId:%d", + req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); + + // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. + taosThreadMutexLock(&execInfo.lock); + + SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + if (pStream == NULL) { + mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", + req.streamId); + + // not in meta-store yet, try to acquire the task in exec buffer + // the checkpoint req arrives too soon before the completion of the create stream trans. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } + } + + int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); + + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + if (pReqTaskList == NULL) { + SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); + doAddTaskInfo(pList, &req); + taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); + + pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + } else { + doAddTaskInfo(*pReqTaskList, &req); + } + + int32_t total = taosArrayGetSize(*pReqTaskList); + if (total == numOfTasks) { // all tasks has send the reqs + mInfo("stream:0x%" PRIx64 + " %s all %d tasks send checkpoint-report, start to update checkpoint meta-info for checkpointId:%" PRId64, + req.streamId, pStream->name, total, req.checkpointId); + + if (pStream != NULL) { + bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + if (conflict) { + mDebug("stream:%x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); + } else { + int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); + if (code == TSDB_CODE_SUCCESS) { // remove this entry + taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + + int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, numOfStreams); + } else { + mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", + req.streamId); + } + } + } + } + + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + + taosThreadMutexUnlock(&execInfo.lock); + + { + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamUpdateChkptRsp)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(req.nodeId); + + tmsgSendRsp(&rsp); pReq->info.handle = NULL; // disable auto rsp } @@ -2473,8 +2627,13 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { } void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { + if (pExecInfo->initTaskList) { + return; + } + addAllStreamTasksIntoBuf(pMnode, pExecInfo); extractNodeListFromStream(pMnode, pExecInfo->pNodeList); + pExecInfo->initTaskList = true; } void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { @@ -2491,4 +2650,38 @@ void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); sdbRelease(pSdb, pStream); } +} + +int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray* pChkptInfoList) { + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME, "update checkpoint-info"); + if (pTrans == NULL) { + return terrno; + } + + /*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid); + int32_t code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream); + if (code != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 6f398fbc11..7c07d003b7 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -241,6 +241,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); + mndInitStreamExecInfo(pMnode, &execInfo); + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); @@ -266,18 +268,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { snodeChanged = true; } } else { - // task is idle for more than 50 sec. -// if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { -// if (!pTaskEntry->inputQChanging) { -// pTaskEntry->inputQUnchangeCounter++; -// } else { -// pTaskEntry->inputQChanging = false; -// } -// } else { -// pTaskEntry->inputQChanging = true; -// pTaskEntry->inputQUnchangeCounter = 0; -// } - streamTaskStatusCopy(pTaskEntry, p); STaskCkptInfo *pChkInfo = &p->checkpointInfo; @@ -288,6 +278,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SFailedCheckpointInfo info = { .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; addIntoCheckpointList(pFailedChkpt, &info); + + // remove failed trans from pChkptStreams + taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId)); } } @@ -333,6 +326,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mndDropOrphanTasks(pMnode, pOrphanTasks); } + scanCheckpointReportInfo(pMnode); + taosThreadMutexUnlock(&execInfo.lock); tCleanupStreamHbMsg(&req); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 7e72f67f45..695c33f646 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -574,6 +574,7 @@ void mndInitExecInfo() { execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); @@ -687,4 +688,115 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { taosArrayDestroy(pRemovedTasks); return 0; +} + +static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { + SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return terrno; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId)); + ASSERT(pReqTaskList); + + int32_t size = taosArrayGetSize(*pReqTaskList); + for(int32_t i = 0; i < size; ++i) { + STaskChkptInfo* pInfo = taosArrayGet(*pReqTaskList, i); + if (pInfo->taskId == pTask->id.taskId) { + pReq->checkpointId = pInfo->checkpointId; + pReq->checkpointVer = pInfo->version; + pReq->checkpointTs = pInfo->ts; + pReq->dropRelHTask = pInfo->dropHTask; + pReq->transId = pInfo->transId; + } + } + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + taosMemoryFree(pReq); + return code; + } + + code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pReq); + } + + return code; +} + +int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + taosWLockLatch(&pStream->lock); + + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + + int32_t code = doSetUpdateChkptAction(pMnode, pTrans, pTask); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return 0; +} + +void scanCheckpointReportInfo(SMnode* pMnode) { + void* pIter = NULL; + mDebug("start to scan checkpoint report info"); + + while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { + SArray *pList = *(SArray **)pIter; + + STaskChkptInfo* pInfo = taosArrayGet(pList, 0); + SStreamObj* pStream = mndGetStreamObj(pMnode, pInfo->streamId); + if (pStream == NULL) { + mError("failed to acquire stream:0x%"PRIx64" remove it from checkpoint-report list", pInfo->streamId); + taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId)); + continue; + } + + int32_t total = mndGetNumOfStreamTasks(pStream); + int32_t existed = (int32_t) taosArrayGetSize(pList); + + if (total == existed) { + mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", + pStream->uid, pStream->name, total); + + bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + if (!conflict) { + int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry + taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId)); + + int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", pInfo->streamId, numOfStreams); + } else { + mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", + pInfo->streamId); + } + } else { + mDebug("stream:%x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId); + } + } else { + mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name, + existed, total, total - existed); + } + + sdbRelease(pMnode->pSdb, pStream); + } + } \ No newline at end of file diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index c61988574c..b0d61ebc06 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -136,6 +136,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg); + case TDMT_MND_STREAM_CHKPT_REPORT_RSP: + return tqStreamProcessChkptReportRsp(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER: return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8222af4d60..7f5ab8b6e6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -261,6 +261,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 712cfbaa55..a2ca3662d7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1278,3 +1278,7 @@ int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg); } + +int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c55745e5c5..7c8ad159bc 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1078,6 +1078,8 @@ int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return d int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } +int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {return doProcessDummyRspMsg(pMeta, pMsg);} + int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 002f04b8a7..b1f353af81 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -854,6 +854,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_CHKPT_REPORT_RSP: + return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ab3b5d6fa0..56ac794d8e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -441,8 +441,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin if (pReq->checkpointId <= pInfo->checkpointId) { stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 " ignored", - id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer); + " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " transId:%d ignored", + id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, + pReq->transId); taosThreadMutexUnlock(&pTask->lock); { // destroy the related fill-history tasks @@ -703,9 +705,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null. if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) { - STaskId* pHTaskId = &pTask->hTaskInfo.id; - code = streamBuildAndSendCheckpointUpdateMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, pHTaskId, &pTask->chkInfo, - dropRelHTask); + code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); if (code == TSDB_CODE_SUCCESS) { code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); if (code != TSDB_CODE_SUCCESS) { @@ -770,6 +770,18 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { streamMetaReleaseTask(pTask->pMeta, pTask); return; } + + // checkpoint-trigger recv flag is set, quit + if (pActiveInfo->allUpstreamTriggerRecv) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", + pTask->id.idStr, vgId, ref); + + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + taosThreadMutexUnlock(&pTask->lock); taosThreadMutexLock(&pActiveInfo->lock); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dc9d2166e6..f8a4deb6b1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -731,35 +731,50 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } -int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId, - SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) { - SVUpdateCheckpointInfoReq* pReq = rpcMallocCont(sizeof(SVUpdateCheckpointInfoReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; +int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) { + int32_t code; + int32_t tlen = 0; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo; + + SCheckpointReport req = {.streamId = pTask->id.streamId, + .taskId = pTask->id.taskId, + .nodeId = vgId, + .dropHTask = dropRelHTask, + .transId = pActive->transId, + .checkpointId = pActive->activeId, + .checkpointVer = pCheckpointInfo->processedVer, + .checkpointTs = pCheckpointInfo->startTs}; + + tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code); + if (code < 0) { + stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code)); return -1; } - pReq->head.vgId = vgId; - pReq->taskId = pTaskId->taskId; - pReq->streamId = pTaskId->streamId; - pReq->dropRelHTask = dropRelHTask; - pReq->hStreamId = pHTaskId->streamId; - pReq->hTaskId = pHTaskId->taskId; - pReq->transId = pCheckpointInfo->pActiveInfo->transId; - - pReq->checkpointId = pCheckpointInfo->pActiveInfo->activeId; - pReq->checkpointVer = pCheckpointInfo->processedVer; - pReq->checkpointTs = pCheckpointInfo->startTs; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_UPDATE_CHKPT, .pCont = pReq, .contLen = sizeof(SVUpdateCheckpointInfoReq)}; - int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg); - - if (code != TSDB_CODE_SUCCESS) { - stError("vgId:%d task:0x%x failed to send update checkpoint info msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); - } else { - stDebug("vgId:%d task:0x%x build and send update checkpoint info msg msg", vgId, pTaskId->taskId); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; } - return code; + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) { + rpcFreeCont(buf); + stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code)); + return -1; + } + tEncoderClear(&encoder); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen); + stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId); + + tmsgSendReq(&pTask->info.mnodeEpset, &msg); + return 0; } STaskId streamTaskGetTaskId(const SStreamTask* pTask) { diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streammsg.c index 705406f044..a6ab6a60c2 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streammsg.c @@ -594,6 +594,34 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointVer) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointTs) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->dropHTask) < 0) return -1; + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointVer) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointTs) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->dropHTask) < 0) return -1; tEndDecode(pDecoder); return 0; } \ No newline at end of file