diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index d05868c2c9..f389bc1a61 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -217,6 +217,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_COMPACT, "kill-compact", SKillCompactReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_TIMER, "compact-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CHKPT, "stream-req-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c6923a2233..bea49d7696 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -640,6 +640,7 @@ typedef struct { int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq); int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq); +// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished typedef struct { int64_t streamId; int64_t checkpointId; @@ -648,6 +649,7 @@ typedef struct { SEpSet mgmtEps; int32_t mnodeId; int32_t transId; + int8_t mndTrigger; int64_t expireTime; } SStreamCheckpointSourceReq; @@ -770,6 +772,15 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); +typedef struct SStreamTaskCheckpointReq { + int64_t streamId; + int32_t taskId; + int32_t nodeId; +} SStreamTaskCheckpointReq; + +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); + int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); @@ -839,6 +850,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); +int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 75d527bc6c..30a9118274 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -767,7 +767,7 @@ _OVER: pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER || pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER || pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE || - pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER) { + pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT) { mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, pMnode->stopped, state.restored, syncStr(state.state)); return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 49d97fb38f..55951c19bb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -67,7 +67,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static SArray *extractNodeListFromStream(SMnode *pMnode); static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); - +static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -130,6 +130,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, mndProcessStreamCheckpointInCandid); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); @@ -980,22 +981,6 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { return 0; } -static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { - return 0; - } - - SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - pMsg->checkpointId = 0; - - int32_t size = sizeof(SMStreamDoCheckpointMsg); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pMsg, .contLen = size}; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - return 0; -} - static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, int64_t streamId, int32_t taskId, int32_t transId) { SStreamCheckpointSourceReq req = {0}; @@ -1005,6 +990,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in req.streamId = streamId; // pTask->id.streamId; req.taskId = taskId; // pTask->id.taskId; req.transId = transId; + req.mndTrigger = 1; int32_t code; int32_t blen; @@ -3093,3 +3079,26 @@ SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { return NULL; } + +int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + + SStreamTaskCheckpointReq req = {0}; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, pReq->pCont, pReq->contLen); + + if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) { + tDecoderClear(&decoder); + terrno = TSDB_CODE_INVALID_MSG; + mError("invalid task checkpoint req msg received"); + return -1; + } + tDecoderClear(&decoder); + + mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId); + + // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. + + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index c1a4754b62..38c3441d43 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -235,7 +235,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index eb50efadeb..16577fb4e7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -36,6 +36,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -50,6 +51,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1; tEndDecode(pDecoder); return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1ec8843c0c..9ecb63aa22 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -398,13 +398,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); - // 3. resume the state of stream task, after this function, the stream task will run immediately. - streamTaskResume(pStreamTask); + // 3. send msg to mnode to launch a checkpoint to keep the state for current stream + streamTaskSendCheckpointReq(pStreamTask); +// streamTaskResume(pStreamTask); - stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id); +// stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id); // 4. free it and remove fill-history task from disk meta-store - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); +// streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. assign the status to the value that will be kept in disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; @@ -412,20 +413,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 6. open the inputQ for all upstream tasks streamTaskOpenAllUpstreamInput(pStreamTask); - // 7. add empty delete block - if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { - SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); - - SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); - pDelBlock->info.rows = 0; - pDelBlock->info.version = 0; - pItem->type = STREAM_INPUT__REF_DATA_BLOCK; - pItem->pBlock = pDelBlock; - int32_t code = streamTaskPutDataIntoInputQ(pStreamTask, (SStreamQueueItem*)pItem); - stDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code); - } - - streamSchedExec(pStreamTask); +// streamSchedExec(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask); return TSDB_CODE_SUCCESS; } @@ -443,7 +431,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); - } else { // drop fill-history task and open inputQ of sink task + } else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task. SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask != NULL) { streamTaskOpenAllUpstreamInput(pStreamTask); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 5e1566c1e1..9ca0596673 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1054,6 +1054,23 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) return 0; } +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + return 0; +} + +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 094068a06e..cf7b557e1f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -852,3 +852,41 @@ void streamTaskResume(SStreamTask* pTask) { bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; } + +int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { + int32_t code; + int32_t tlen = 0; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + + SStreamTaskCheckpointReq req = {0}; + tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code); + if (code < 0) { + stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code)); + return -1; + } + + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) { + rpcFreeCont(buf); + stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code)); + return -1; + } + tEncoderClear(&encoder); + + SRpcMsg msg = {.info.noResp = 1}; + initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen); + + stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId); + + tmsgSendReq(&pTask->info.mnodeEpset, &msg); + return 0; +}