From 128f67784a5a4a4a8822b765f03f1fb61a4bddc1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Jul 2023 18:17:50 +0800 Subject: [PATCH] fix(stream): fix error in parsing checkpoint msg. --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 19 ++++++++++++++----- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 ++------ 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8aabaac0bb..586e692d77 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -219,7 +219,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. -int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); +int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqCheckStreamStatus(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 34ac0540d8..9f8c0551de 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1492,14 +1492,14 @@ FAIL: int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } -int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { +int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); - int32_t len = msgLen - sizeof(SMsgHead); + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; - SStreamCheckpointSourceReq req= {0}; + SStreamCheckpointSourceReq req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); @@ -1524,11 +1524,20 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMs taosArrayPush(pTask->pRpcMsgList, &pRpcMsg); // todo: when generating checkpoint, no new tasks are allowed to add into current Vnode + // set the initial value for generating check point + int32_t total = 0; taosWLockLatch(&pMeta->lock); - pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); + if (pMeta->chkptNotReadyTasks == 0) { + pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); + } + + total = taosArrayGetSize(pMeta->pTaskList); taosWUnLockLatch(&pMeta->lock); + qDebug("s-task:%s level:%d receive the checkpoint msg id:%" PRId64 " from mnode, total source checkpoint req:%d", + pTask->id.idStr, pTask->info.taskLevel, req.checkpointId, total); + streamProcessCheckpointSourceReq(pMeta, pTask, &req); streamMetaReleaseTask(pMeta, pTask); return code; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 28eb495c70..398fa6d166 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -498,12 +498,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } } break; - case TDMT_VND_STREAM_CHECK_POINT_SOURCE: { - if (tqProcessStreamCheckPointSourceReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { - goto _err; - } - } break; - case TDMT_STREAM_TASK_CHECKPOINT: { if (tqProcessStreamCheckPointReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { goto _err; @@ -688,6 +682,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_CHECK_POINT_SOURCE: + return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR;