From 83f84d92bc89a7365bcc42f94308a39d3421beb8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 28 Sep 2023 22:44:13 +0800 Subject: [PATCH] refactor: do some internal refactor (discard the checkpoint source msg during restoring). --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 14 +++++++++++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/stream/src/streamMeta.c | 3 +-- source/libs/stream/src/streamRecover.c | 19 +++++++------------ 5 files changed, 21 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1e07c87cb2..2fab7c087a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -224,7 +224,7 @@ int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqScanWalAsync(STQ* pTq, bool ckPause); -int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); +int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b523faec7f..9780a1f046 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1638,7 +1638,7 @@ FAIL: } // todo error code cannot be return, since this is invoked by an mnode-launched transaction. -int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { +int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -1648,6 +1648,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs // disable auto rsp to source pRsp->info.handle = NULL; + // todo: add counter to make sure other tasks would not be trapped in checkpoint state SStreamCheckpointSourceReq req = {0}; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId); @@ -1657,6 +1658,14 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs return TSDB_CODE_SUCCESS; } + if (!pTq->pVnode->restored) { + tqDebug("vgId:%d checkpoint-source msg received during restoring, ignore it", vgId); + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; + } + SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { @@ -1680,6 +1689,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs return TSDB_CODE_SUCCESS; } + // todo: handle the partial failure cases // downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. if (pTask->status.downstreamReady != 1) { qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 @@ -1731,8 +1741,6 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs return code; } - // todo: when generating checkpoint, no new tasks are allowed to add into current Vnode - // todo: when generating checkpoint, leader of mnode has transfer to other DNode? streamMetaReleaseTask(pMeta, pTask); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7a1e60f075..5622568b7b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -605,7 +605,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp); break; case TDMT_VND_STREAM_CHECK_POINT_SOURCE: - tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg, pRsp); + tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp); break; case TDMT_VND_STREAM_TASK_UPDATE: tqProcessTaskUpdateReq(pVnode->pTq, pMsg); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 840eee98a8..5470da8360 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -716,9 +716,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { taosArrayPush(pMeta->pTaskList, &pTask->id); } else { + // todo this should replace the existed object put by replay creating stream task msg from mnode stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); - ASSERT(0); - tdbFree(pKey); tdbFree(pVal); taosMemoryFree(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 3e491fa1bf..67f4108270 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -583,16 +583,12 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { SDataRange* pRange = &pHTask->dataRange; - pRange->range.minVer = 0; - // todo remove this // the query version range should be limited to the already processed data + pRange->range.minVer = 0; pRange->range.maxVer = pTask->chkInfo.nextProcessVer - 1; - if (pRange->range.maxVer < pRange->range.minVer) { - pRange->range.maxVer = pRange->range.minVer; - } - pHTask->execInfo.init = taosGetTimestampMs(); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 " ver range:%" PRId64 " - %" PRId64", init:%"PRId64, @@ -890,9 +886,8 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { if (pTask->hTaskInfo.id.taskId == 0) { if (pTask->info.fillHistory == 1) { - stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 - "-%" PRId64, - pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); + stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } else { stDebug( "s-task:%s no related fill-history task, stream time window and verRange are not set. default stream time " @@ -915,9 +910,9 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { pRange->range.maxVer = ver; stDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64 - ", verRang:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, - pRange->range.maxVer); + ", verRang:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, + pRange->range.maxVer); } }