From 15573efbbc57a27c7a8c980a3cf5fa8fea5cfe5b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Jul 2024 14:57:40 +0800 Subject: [PATCH 1/5] fix(stream):commit the update of table meta. --- source/libs/stream/src/streamCheckpoint.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cdb5bf0b50..e555db82f6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -442,6 +442,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV id, vgId, pReq->taskId, numOfTasks); } streamMetaWLock(pMeta); + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } } return TSDB_CODE_SUCCESS; From 4e684c400daecbf796e5c727854e6e9e0656f32f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Jul 2024 15:12:05 +0800 Subject: [PATCH 2/5] fix(stream): ignore the invalid_vgroup_id error for checkpoint_trans. --- source/dnode/mnode/impl/src/mndStream.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 415d1ff9f0..223c29638f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -966,8 +966,8 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask return -1; } - code = - setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0); + code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, + TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(buf); } From 93c06be262bcbfb710922d62611853ff7d69fc0a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Jul 2024 17:03:21 +0800 Subject: [PATCH 3/5] fix(stream): free checkpoint trigger block in case of redundant trigger block recved. --- source/libs/stream/src/streamCheckpoint.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e555db82f6..b490b0e02a 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -169,6 +169,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); taosThreadMutexUnlock(&pTask->lock); + + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } @@ -188,6 +190,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); taosThreadMutexUnlock(&pTask->lock); + + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } @@ -197,6 +201,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock " discard", id, vgId, pActiveInfo->activeId, checkpointId); taosThreadMutexUnlock(&pTask->lock); + + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } else { // checkpointId == pActiveInfo->activeId if (pActiveInfo->allUpstreamTriggerRecv == 1) { @@ -205,6 +211,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock "checkpointId:%" PRId64 " transId:%d", id, vgId, checkpointId, transId); taosThreadMutexUnlock(&pTask->lock); + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } @@ -219,6 +226,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); taosThreadMutexUnlock(&pTask->lock); + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } } From 8c024c85546c9bd05f12ca4148ae8ed86d392bd7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Jul 2024 11:43:14 +0800 Subject: [PATCH 4/5] fix(stream): add some logs. --- source/dnode/mnode/impl/src/mndStream.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 223c29638f..1d20b16830 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2317,6 +2317,8 @@ static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) { } taosHashCleanup(pHash); + + mDebug("numOfNodes for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList)); return TSDB_CODE_SUCCESS; } @@ -2905,7 +2907,6 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { } addAllStreamTasksIntoBuf(pMnode, pExecInfo); - extractNodeListFromStream(pMnode, pExecInfo->pNodeList); pExecInfo->initTaskList = true; } From cb42806148784530a7a2f70ca1349cc818850846 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Jul 2024 13:31:39 +0800 Subject: [PATCH 5/5] fix(stream): update some logs. --- source/dnode/mnode/impl/src/mndStream.c | 3 +-- source/dnode/mnode/impl/src/mndStreamHb.c | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1d20b16830..d57dc6e52e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -59,7 +59,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); -//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg); static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code); @@ -2318,7 +2317,7 @@ static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) { taosHashCleanup(pHash); - mDebug("numOfNodes for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList)); + mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList)); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1452ac77d2..bc10ec211d 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -255,7 +255,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mndInitStreamExecInfo(pMnode, &execInfo); if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { - mError("invalid hbMsg from vgId:%d, discarded", req.vgId); + mError("vgId:%d not exists in nodeList buf, discarded", req.vgId); terrno = TSDB_CODE_INVALID_MSG; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);