From 81a1eac87a9e17a240631f34720518f982bb6f32 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 26 Feb 2024 22:49:38 +0800 Subject: [PATCH 1/7] fix(stream): add some logs. --- source/dnode/mnode/impl/src/mndScheduler.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index ef9a7205e1..7831048f2d 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -300,14 +300,15 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* } static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) { - for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + int32_t size = (int32_t) taosArrayGetSize(pList); + for (int32_t i = 0; i < size; ++i) { SVgroupVer* pVer = taosArrayGet(pList, i); if (pVer->vgId == vgId) { return pVer->ver; } } - mError("failed to find the vgId:%d for extract last version", vgId); + mError("failed to find the vgId:%d for extract last version, total existed vgs:%d", vgId, size); return -1; } @@ -472,6 +473,9 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam); if (code != 0) { + mError("create stream task, code:%s", tstrerror(code)); + + // todo drop the added source tasks. sdbRelease(pSdb, pVgroup); return code; } From dafac591f7d380c651bb61ea4b7c999b77150c69 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Feb 2024 10:53:25 +0800 Subject: [PATCH 2/7] enh(stream): add rsp for req-checkpoint --- include/dnode/vnode/tqCommon.h | 1 + source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mnode/impl/inc/mndStream.h | 4 ++++ source/dnode/mnode/impl/src/mndScheduler.c | 4 ++-- source/dnode/mnode/impl/src/mndStream.c | 13 ++++++++++++- source/dnode/mnode/impl/src/mndStreamHb.c | 4 ---- source/dnode/snode/src/snode.c | 2 ++ source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 4 ++++ source/dnode/vnode/src/tqCommon/tqCommon.c | 6 +++++- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 ++ source/libs/stream/src/streamTask.c | 3 +-- 13 files changed, 36 insertions(+), 10 deletions(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 84d0dd4982..ec00855666 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -27,6 +27,7 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 8c726f9f03..99777bbbb0 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -89,6 +89,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4213541351..9336bb7141 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -838,6 +838,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index aed49809dd..57fd187da3 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -86,6 +86,10 @@ typedef struct SOrphanTask { int32_t nodeId; } SOrphanTask; +typedef struct { + SMsgHead head; +} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg; + int32_t mndInitStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode); SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 7831048f2d..9aba428ff6 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -308,8 +308,8 @@ static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) { } } - mError("failed to find the vgId:%d for extract last version, total existed vgs:%d", vgId, size); - return -1; + mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size); + return 1; } static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8da56d2d46..3ef2f64df7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -877,7 +877,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre int64_t ts = taosGetTimestampMs(); if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { // mWarn("checkpoint interval less than the threshold, ignore it"); - return -1; + return TSDB_CODE_SUCCESS; } bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); @@ -2179,5 +2179,16 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mndReleaseStream(pMnode, pStream); taosThreadMutexUnlock(&execInfo.lock); + { + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead* pHead = rsp.pCont; + pHead->vgId = htonl(req.nodeId); + + tmsgSendRsp(&rsp); + + pReq->info.handle = NULL; // disable auto rsp + } + return 0; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1d296a1c6e..14f3c533e3 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -16,10 +16,6 @@ #include "mndStream.h" #include "mndTrans.h" -typedef struct { - SMsgHead head; -} SMStreamHbRspMsg; - typedef struct SFailedCheckpointInfo { int64_t streamUid; int64_t checkpointId; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 34623c021b..e4b3cab256 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -180,6 +180,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg); case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg); + case TDMT_MND_STREAM_REQ_CHKPT_RSP: + return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b83af84086..3d45a0765e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -243,6 +243,7 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a3fd19c7f8..7459551329 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1225,3 +1225,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); } + +int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 4e49091da4..b66a114ef8 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -938,9 +938,13 @@ int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); } -int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { +static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; return TSDB_CODE_SUCCESS; } + +int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } + +int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 86355cdbb0..7763729c2c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -798,6 +798,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqProcessStreamHbRsp(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_REQ_CHKPT_RSP: + return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b63dc50836..9639921c77 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -934,9 +934,8 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { } tEncoderClear(&encoder); - SRpcMsg msg = {.info.noResp = 1}; + SRpcMsg msg = {0}; initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen); - stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId); tmsgSendReq(&pTask->info.mnodeEpset, &msg); From aee0460daf81110ec9175e84a0e5b3a7204236e9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Feb 2024 11:15:09 +0800 Subject: [PATCH 3/7] enh(stream):rsp the checkpoint ready msg. --- include/dnode/vnode/tqCommon.h | 1 + source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/snode/src/snode.c | 2 ++ source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 4 ++++ source/dnode/vnode/src/tqCommon/tqCommon.c | 4 ++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 2 ++ source/libs/stream/src/streamDispatch.c | 1 - 9 files changed, 16 insertions(+), 1 deletion(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index ec00855666..22a176f0bb 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -28,6 +28,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 99777bbbb0..1b1dcc9b54 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -87,6 +87,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9336bb7141..bfac0bab9d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -835,6 +835,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index e4b3cab256..3bef5b595b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -182,6 +182,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg); case TDMT_MND_STREAM_REQ_CHKPT_RSP: return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: + return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3d45a0765e..06051ee5c8 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -244,6 +244,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7459551329..e442af4a3d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1229,3 +1229,7 @@ int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg); } + +int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b66a114ef8..c1c10b2fc2 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -948,3 +948,7 @@ static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } + +int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + return doProcessDummyRspMsg(pMeta, pMsg); +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7763729c2c..84cbbfd4b2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -800,6 +800,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessStreamHbRsp(pVnode->pTq, pMsg); case TDMT_MND_STREAM_REQ_CHKPT_RSP: return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: + return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 60c545b9e5..78b914c3db 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -907,7 +907,6 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, SStreamChkptReadyInfo info = {.upStreamTaskId = pInfo->taskId, .upstreamNodeEpset = pInfo->epSet}; initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); - info.msg.info.noResp = 1; // refactor later. stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d, vgId:%d", From 80d027da196ffde1a7d7b2b1c58befaf34dcd25c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Feb 2024 11:22:58 +0800 Subject: [PATCH 4/7] fix(stream): restore from the crash. --- source/dnode/vnode/src/tq/tq.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e442af4a3d..e279b4bc1d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1175,7 +1175,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } } else { - ASSERT(status == TASK_STATUS__HALT); +// ASSERT(status == TASK_STATUS__HALT); + if (status != TASK_STATUS__HALT) { + tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr); + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); + } } // check if the checkpoint msg already sent or not. From 402a43bf67143a71521dfdafbdc0c4f9a084be50 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Feb 2024 11:25:32 +0800 Subject: [PATCH 5/7] fix(stream): disable halt. --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e279b4bc1d..011e62cb89 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1178,7 +1178,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // ASSERT(status == TASK_STATUS__HALT); if (status != TASK_STATUS__HALT) { tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr); - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); +// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); } } From de1c67f77381e28af14d76621dfe0a511cd3f6bc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Feb 2024 12:57:43 +0800 Subject: [PATCH 6/7] fix(stream): rsp the checkpoint ready msg. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c1c10b2fc2..e3dbaa8803 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -485,6 +485,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe return code; } +typedef struct SMStreamCheckpointReadyRspMsg { + int8_t placeholder; +}SMStreamCheckpointReadyRspMsg; + int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -513,6 +517,18 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) streamProcessCheckpointReadyMsg(pTask); streamMetaReleaseTask(pMeta, pTask); + + { // send checkpoint ready rsp + SRpcMsg rsp = {.code = 0, .info = pMsg->info, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead* pHead = rsp.pCont; + pHead->vgId = htonl(req.downstreamNodeId); + + tmsgSendRsp(&rsp); + + pMsg->info.handle = NULL; // disable auto rsp + } + return code; } From d05c55f3fe40e5bc16432f1da3a726a85f298b97 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Feb 2024 12:59:08 +0800 Subject: [PATCH 7/7] fix(stream): update the msg struct. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index e3dbaa8803..a2d45062b9 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -486,7 +486,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe } typedef struct SMStreamCheckpointReadyRspMsg { - int8_t placeholder; + SMsgHead head; }SMStreamCheckpointReadyRspMsg; int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {