From a18989df882827d50b8f300f651461fe146a8bb9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 17 Aug 2023 18:51:39 +0800 Subject: [PATCH] fix(stream): add lost codes caused by merge 3.0. --- include/common/tmsgdef.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 38 ++++++++++++++++++------- source/libs/stream/src/stream.c | 4 +++ 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a27af43089..90ff1a36c0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -184,6 +184,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b296255995..da6d35327e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -40,11 +40,15 @@ typedef struct SNodeEntry { } SNodeEntry; typedef struct SStreamVnodeRevertIndex { - SArray* pDBList; SArray* pNodeEntryList; int64_t ts; // snapshot ts } SStreamVnodeRevertIndex; +typedef struct SVgroupChangeInfo { + SHashObj* pDBMap; + SArray* pUpdateNodeList; //SArray +} SVgroupChangeInfo; + static int32_t mndNodeCheckSentinel = 0; static SStreamVnodeRevertIndex execNodeList; @@ -68,6 +72,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, int64_t streamId, int32_t taskId); +static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans); @@ -86,7 +91,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); - mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq); + mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); @@ -100,6 +105,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq); @@ -868,9 +874,9 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); pMsg->checkpointId = checkpointId; - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)}; -// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + int32_t size = sizeof(SMStreamDoCheckpointMsg); + SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } @@ -1741,11 +1747,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -typedef struct SVgroupChangeInfo { - SHashObj* pDBMap; - SArray* pUpdateNodeList; //SArray -} SVgroupChangeInfo; - static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo, int64_t streamId, int32_t taskId) { pMsg->streamId = streamId; pMsg->taskId = taskId; @@ -1973,7 +1974,7 @@ static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) { return pVgroupListSnapshot; } -int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { +static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { SSdb *pSdb = pMnode->pSdb; // check all streams that involved this vnode should update the epset info @@ -2093,6 +2094,21 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } +typedef struct SMStreamNodeCheckMsg{} SMStreamNodeCheckMsg; + +static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { + return 0; + } + + SMStreamNodeCheckMsg *pMsg = rpcMallocCont(sizeof(SMStreamNodeCheckMsg)); + SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + return 0; +} + // todo: this process should be executed by the write queue worker of the mnode //int32_t mndProcessStreamHb(SRpcMsg *pReq) { // SMnode *pMnode = pReq->info.node; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 57f736de64..df9813ad9b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -171,6 +171,10 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr); } else { + if (pBlock->type == STREAM_INPUT__TRANS_STATE) { + pTask->status.appendTranstateBlock = true; + } + int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); // input queue is full, upstream is blocked now status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;