diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 84d0dd4982..ec00855666 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -27,6 +27,7 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 8c726f9f03..99777bbbb0 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -89,6 +89,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4213541351..9336bb7141 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -838,6 +838,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index aed49809dd..57fd187da3 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -86,6 +86,10 @@ typedef struct SOrphanTask { int32_t nodeId; } SOrphanTask; +typedef struct { + SMsgHead head; +} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg; + int32_t mndInitStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode); SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 7831048f2d..9aba428ff6 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -308,8 +308,8 @@ static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) { } } - mError("failed to find the vgId:%d for extract last version, total existed vgs:%d", vgId, size); - return -1; + mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size); + return 1; } static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8da56d2d46..3ef2f64df7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -877,7 +877,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre int64_t ts = taosGetTimestampMs(); if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { // mWarn("checkpoint interval less than the threshold, ignore it"); - return -1; + return TSDB_CODE_SUCCESS; } bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); @@ -2179,5 +2179,16 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mndReleaseStream(pMnode, pStream); taosThreadMutexUnlock(&execInfo.lock); + { + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead* pHead = rsp.pCont; + pHead->vgId = htonl(req.nodeId); + + tmsgSendRsp(&rsp); + + pReq->info.handle = NULL; // disable auto rsp + } + return 0; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1d296a1c6e..14f3c533e3 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -16,10 +16,6 @@ #include "mndStream.h" #include "mndTrans.h" -typedef struct { - SMsgHead head; -} SMStreamHbRspMsg; - typedef struct SFailedCheckpointInfo { int64_t streamUid; int64_t checkpointId; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 34623c021b..e4b3cab256 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -180,6 +180,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg); case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg); + case TDMT_MND_STREAM_REQ_CHKPT_RSP: + return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b83af84086..3d45a0765e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -243,6 +243,7 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a3fd19c7f8..7459551329 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1225,3 +1225,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); } + +int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 4e49091da4..b66a114ef8 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -938,9 +938,13 @@ int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); } -int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { +static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; return TSDB_CODE_SUCCESS; } + +int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } + +int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 86355cdbb0..7763729c2c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -798,6 +798,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqProcessStreamHbRsp(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_REQ_CHKPT_RSP: + return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b63dc50836..9639921c77 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -934,9 +934,8 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { } tEncoderClear(&encoder); - SRpcMsg msg = {.info.noResp = 1}; + SRpcMsg msg = {0}; initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen); - stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId); tmsgSendReq(&pTask->info.mnodeEpset, &msg);