From 5bffb0c6754a3fd66fd15ace237821bced3098e3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 29 Aug 2024 14:21:23 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 4 +++- source/dnode/vnode/src/tq/tqStreamTask.c | 6 +++++ source/dnode/vnode/src/tqCommon/tqCommon.c | 23 ++++++++---------- source/libs/stream/src/streamCheckpoint.c | 26 ++++++++------------ source/libs/stream/src/streamHb.c | 4 +++- source/libs/stream/src/streamStartTask.c | 28 ++++++++++++++++++---- 6 files changed, 56 insertions(+), 35 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 818d8cfdd4..20f91106a5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -785,7 +785,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); -int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts); +int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts); +void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts); +void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts); // timer int32_t streamTimerGetInstance(tmr_h* pTmr); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 4b206fc04f..4c44280311 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -138,6 +138,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return 0; } + if (pMeta->startInfo.startAllTasks) { + tqTrace("vgId:%d in restart procedure, not scan wal", vgId); + streamMetaWUnLock(pMeta); + return 0; + } + pMeta->scanInfo.scanCounter += 1; if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index cad2ca3eb0..29f050d5e4 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1191,14 +1191,13 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { } int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - int32_t vgId = pMeta->vgId; - int32_t code = 0; - - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int64_t now = taosGetTimestampMs(); - + int32_t vgId = pMeta->vgId; + int32_t code = 0; + SStreamTask* pTask = NULL; SRestoreCheckpointInfo req = {0}; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int64_t now = taosGetTimestampMs(); SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); @@ -1211,7 +1210,6 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tDecoderClear(&decoder); - SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask == NULL || (code != 0)) { tqError( @@ -1238,9 +1236,10 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMutexLock(&pTask->lock); ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); - if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) { + SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo; + if (pConsenInfo->consenChkptTransId >= req.transId) { tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId, - pTask->status.consenChkptInfo.consenChkptTransId, req.transId); + pConsenInfo->consenChkptTransId, req.transId); streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; @@ -1256,9 +1255,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { pTask->id.idStr, vgId, req.checkpointId, req.transId); } - pTask->status.consenChkptInfo.consenChkptTransId = req.transId; - pTask->status.consenChkptInfo.status = TASK_CONSEN_CHKPT_RECV; - pTask->status.consenChkptInfo.statusTs = taosGetTimestampMs(); + streamTaskSetConsenChkptIdRecv(pTask, req.transId, now); streamMutexUnlock(&pTask->lock); if (pMeta->role == NODE_ROLE_LEADER) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 565a3e35e7..da882505f4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -615,7 +615,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; - if (restored) { + if (restored && (pMeta->role == NODE_ROLE_LEADER)) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); } } @@ -1371,29 +1371,23 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { } int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { - const char* id = pTask->id.idStr; - SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo; - streamMutexLock(&pTask->lock); ETaskStatus p = streamTaskGetStatus(pTask).state; -// if (pInfo->alreadySendChkptId == true) { -// stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); -// streamMutexUnlock(&pTask->lock); -// return TSDB_CODE_SUCCESS; -// } else { -// pInfo->alreadySendChkptId = true; -// } -// + // if (pInfo->alreadySendChkptId == true) { + // stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); + // streamMutexUnlock(&pTask->lock); + // return TSDB_CODE_SUCCESS; + // } else { + // pInfo->alreadySendChkptId = true; + // } + // + streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs()); streamMutexUnlock(&pTask->lock); if (pTask->pBackend != NULL) { streamFreeTaskState(pTask, p); pTask->pBackend = NULL; } - - pInfo->status = TASK_CONSEN_CHKPT_REQ; - pInfo->statusTs = taosGetTimestampMs(); - stDebug("s-task:%s set the require consensus-checkpointId flag, ts:%" PRId64, id, pInfo->statusTs); return 0; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index ec65c274cf..7a703ae30c 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -197,10 +197,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } } - entry.checkpointInfo.consensusChkptId = streamTaskSetReqConsensusChkptId(*pTask, pMsg->ts); + streamMutexLock(&(*pTask)->lock); + entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(*pTask, pMsg->ts); if (entry.checkpointInfo.consensusChkptId) { entry.checkpointInfo.consensusTs = pMsg->ts; } + streamMutexUnlock(&(*pTask)->lock); if ((*pTask)->exec.pWalReader != NULL) { entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1; diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 90987e3fba..d5d6163009 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -444,7 +444,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { return 0; } -int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) { +int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) { SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo; int32_t vgId = pTask->pMeta->vgId; @@ -455,11 +455,13 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) { vgId, pConChkptInfo->statusTs); return 1; } else { - if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && (ts - pConChkptInfo->statusTs) > 60 * 1000) { + int64_t el = (ts - pConChkptInfo->statusTs) / 1000; + if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) { pConChkptInfo->statusTs = ts; - stWarn("s-task:%s vgId:%d not recv consensus-chkptId for 60s, set requiring in Hb again, ts:%" PRId64, - pTask->id.idStr, vgId, pConChkptInfo->statusTs); + stWarn( + "s-task:%s vgId:%d not recv consensus-chkptId for %ds(more than 60s), set requiring in Hb again, ts:%" PRId64, + pTask->id.idStr, vgId, el, pConChkptInfo->statusTs); return 1; } } @@ -467,4 +469,22 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) { return 0; } +void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) { + SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo; + pInfo->consenChkptTransId = transId; + pInfo->status = TASK_CONSEN_CHKPT_RECV; + pInfo->statusTs = ts; + stDebug("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId); +} + +void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) { + SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo; + int32_t prevTrans = pInfo->consenChkptTransId; + + pInfo->status = TASK_CONSEN_CHKPT_REQ; + pInfo->statusTs = ts; + pInfo->consenChkptTransId = 0; + + stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts); +}