diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a27af43089..90ff1a36c0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -184,6 +184,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index add8a48953..ed20492fff 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -45,6 +45,11 @@ typedef struct SStreamVnodeRevertIndex { int64_t ts; // snapshot ts } SStreamVnodeRevertIndex; +typedef struct SVgroupChangeInfo { + SHashObj *pDBMap; + SArray *pUpdateNodeList; // SArray +} SVgroupChangeInfo; + static int32_t mndNodeCheckSentinel = 0; static SStreamVnodeRevertIndex execNodeList; @@ -68,6 +73,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, int64_t streamId, int32_t taskId); +static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); @@ -86,7 +92,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); - mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq); + mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); @@ -100,6 +106,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq); @@ -869,9 +876,9 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); pMsg->checkpointId = checkpointId; - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)}; - // tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + int32_t size = sizeof(SMStreamDoCheckpointMsg); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } @@ -1178,7 +1185,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); return -1; } - //mndTransSetSerial(pTrans); + // mndTransSetSerial(pTrans); // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { @@ -1744,11 +1751,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -typedef struct SVgroupChangeInfo { - SHashObj *pDBMap; - SArray *pUpdateNodeList; // SArray -} SVgroupChangeInfo; - static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, int64_t streamId, int32_t taskId) { pMsg->streamId = streamId; @@ -1977,7 +1979,7 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) { return pVgroupListSnapshot; } -int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { +static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { SSdb *pSdb = pMnode->pSdb; // check all streams that involved this vnode should update the epset info @@ -2098,6 +2100,23 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } +typedef struct SMStreamNodeCheckMsg { +} SMStreamNodeCheckMsg; + +static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { + return 0; + } + + SMStreamNodeCheckMsg *pMsg = rpcMallocCont(sizeof(SMStreamNodeCheckMsg)); + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + return 0; +} + // todo: this process should be executed by the write queue worker of the mnode // int32_t mndProcessStreamHb(SRpcMsg *pReq) { // SMnode *pMnode = pReq->info.node; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 57f736de64..df9813ad9b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -171,6 +171,10 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr); } else { + if (pBlock->type == STREAM_INPUT__TRANS_STATE) { + pTask->status.appendTranstateBlock = true; + } + int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); // input queue is full, upstream is blocked now status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;