From 90aa27ee23ceb9cafc28797f7b229cdcd7bc22d1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Mar 2025 13:57:31 +0800 Subject: [PATCH] fix(stream): handle TDMT_STREAM_CHKPT_EXEC in stream queue. --- source/dnode/mgmt/mgmt_snode/inc/smInt.h | 2 +- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 2 ++ source/dnode/mgmt/mgmt_snode/src/smWorker.c | 11 ++++++++++- source/dnode/snode/src/snode.c | 2 ++ tests/system-test/8-stream/checkpoint_info2.py | 2 ++ 5 files changed, 17 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 9d519e88f0..0df4b1c58a 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -47,7 +47,7 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pMsg); int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); -void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg); +int32_t smPutNodeMsgToChkptQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 024e2e4e99..11710d7b39 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -102,6 +102,8 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER; + code = 0; _OVER: if (code != 0) { diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 1e882fc656..1255542454 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -162,6 +162,9 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { case WRITE_QUEUE: code = smPutNodeMsgToWriteQueue(pMgmt, pMsg); break; + case STREAM_CHKPT_QUEUE: + code = smPutNodeMsgToStreamQueue(pMgmt, pMsg); + break; default: code = TSDB_CODE_INVALID_PARA; rpcFreeCont(pMsg->pCont); @@ -172,7 +175,6 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { } int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { - int32_t code = 0; SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0); if (pWorker == NULL) { return TSDB_CODE_INVALID_MSG; @@ -198,3 +200,10 @@ int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } + +//int32_t smPutNodeMsgToChkptQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { +// SSingleWorker *pWorker = &pMgmt->chkptWorker; +// +// dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); +// return taosWriteQitem(pWorker->queue, pMsg); +//} diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 326dcd712e..dcdc70da68 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -138,6 +138,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg); + case TDMT_STREAM_CHKPT_EXEC: + return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true); default: sndError("invalid snode msg:%d", pMsg->msgType); return TSDB_CODE_INVALID_MSG; diff --git a/tests/system-test/8-stream/checkpoint_info2.py b/tests/system-test/8-stream/checkpoint_info2.py index 3dc57477f7..f4c8da8c9d 100644 --- a/tests/system-test/8-stream/checkpoint_info2.py +++ b/tests/system-test/8-stream/checkpoint_info2.py @@ -22,6 +22,8 @@ from util.cluster import * # should be used by -N option class TDTestCase: updatecfgDict = {'checkpointInterval': 60 , + 'vdebugflag':143, + 'ddebugflag':143 } def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar)