From 6fa54789cb40a08be8bea5a28a6532aa3d4b534f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Sep 2023 17:22:31 +0800 Subject: [PATCH] fix(stream): disable follower send hb to mnode. and do some internal refactor. --- include/libs/stream/tstream.h | 6 ++++- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 17 ++++++++----- source/dnode/vnode/src/tq/tqStreamTask.c | 13 ++++------ source/dnode/vnode/src/tq/tqUtil.c | 11 +++++--- source/dnode/vnode/src/vnd/vnodeSync.c | 22 +++++++++++++--- source/libs/stream/src/streamMeta.c | 32 ++++++++++++++++++------ 7 files changed, 73 insertions(+), 30 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 60043d4df6..932a6d951b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -400,6 +400,8 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; int64_t stage; + bool leader; + int8_t taskWillbeLaunched; SRWLatch lock; // TdThreadRwlock lock; int32_t walScanCounter; @@ -408,7 +410,8 @@ typedef struct SStreamMeta { SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; SMetaHbInfo hbInfo; - int32_t closedTask; + SHashObj* pUpdateTaskList; +// int32_t closedTask; int32_t totalTasks; // this value should be increased when a new task is added into the meta int32_t chkptNotReadyTasks; int64_t rid; @@ -722,6 +725,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); +void streamMetaStartHb(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c6a424666c..4e73a481c8 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -174,7 +174,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset); -void tqUpdateNodeStage(STQ* pTq); +void tqUpdateNodeStage(STQ* pTq, bool isLeader); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ab6e0d1171..fa734096c9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1756,10 +1756,10 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { streamTaskStop(*ppHTask); } - pMeta->closedTask += 1; + taosHashPut(pMeta->pUpdateTaskList, &pTask->id, sizeof(pTask->id), NULL, 0); if (ppHTask != NULL) { tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); - pMeta->closedTask += 1; + taosHashPut(pMeta->pUpdateTaskList, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); } else { tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); } @@ -1768,11 +1768,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // possibly only handle the stream task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - if (pMeta->closedTask < numOfTasks) { - tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask)); + int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskList); + if (updateTasks < numOfTasks) { + pMeta->taskWillbeLaunched = 1; + + tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, updateTasks, (numOfTasks - updateTasks)); taosWUnLockLatch(&pMeta->lock); } else { - pMeta->closedTask = 0; + taosHashClear(pMeta->pUpdateTaskList); if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); @@ -1794,12 +1797,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } - taosWUnLockLatch(&pMeta->lock); if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d, restart all stream tasks", vgId); tqStartStreamTasks(pTq); tqCheckAndRunStreamTaskAsync(pTq); } + + pMeta->taskWillbeLaunched = 0; + taosWUnLockLatch(&pMeta->lock); } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 255f71bf30..8c45aa4f8c 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -111,12 +111,12 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - taosWLockLatch(&pMeta->lock); +// taosWLockLatch(&pMeta->lock); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); - taosWUnLockLatch(&pMeta->lock); +// taosWUnLockLatch(&pMeta->lock); return 0; } @@ -124,7 +124,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - taosWUnLockLatch(&pMeta->lock); +// taosWUnLockLatch(&pMeta->lock); return -1; } @@ -135,7 +135,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); - taosWUnLockLatch(&pMeta->lock); +// taosWUnLockLatch(&pMeta->lock); return 0; } @@ -237,8 +237,6 @@ int32_t tqStartStreamTasks(STQ* pTq) { return TSDB_CODE_SUCCESS; } - taosWLockLatch(&pMeta->lock); - for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); @@ -246,12 +244,11 @@ int32_t tqStartStreamTasks(STQ* pTq) { SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key)); int8_t status = (*pTask)->status.taskStatus; - if (status == TASK_STATUS__STOP) { + if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) { streamSetStatusNormal(*pTask); } } - taosWUnLockLatch(&pMeta->lock); return 0; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 04695c1f63..f10f87b6b7 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -36,10 +36,15 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { return 0; } -void tqUpdateNodeStage(STQ* pTq) { +void tqUpdateNodeStage(STQ* pTq, bool isLeader) { SSyncState state = syncGetState(pTq->pVnode->sync); - pTq->pStreamMeta->stage = state.term; - tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage); + SStreamMeta* pMeta = pTq->pStreamMeta; + tqDebug("vgId:%d update the meta stage:%"PRId64", prev:%"PRId64" leader:%d", pMeta->vgId, state.term, pMeta->stage, isLeader); + pMeta->stage = state.term; + pMeta->leader = isLeader; + if (isLeader) { + streamMetaStartHb(pMeta); + } } static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 3a7a60fcbb..9a4dfc8c11 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -549,9 +549,20 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm)); walApplyVer(pVnode->pWal, commitIdx); - pVnode->restored = true; + if (!pVnode->pTq->pStreamMeta->taskWillbeLaunched) { + vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); + return; + } + + taosWLockLatch(&pVnode->pTq->pStreamMeta->lock); + if (!pVnode->pTq->pStreamMeta->taskWillbeLaunched) { + vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); + taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); + return; + } + if (vnodeIsRoleLeader(pVnode)) { // start to restore all stream tasks if (tsDisableStream) { @@ -564,6 +575,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); } + + taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { @@ -578,7 +591,10 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) { } taosThreadMutexUnlock(&pVnode->lock); - tqStopStreamTasks(pVnode->pTq); + if (pVnode->pTq) { + tqUpdateNodeStage(pVnode->pTq, false); + tqStopStreamTasks(pVnode->pTq); + } } static void vnodeBecomeLearner(const SSyncFSM *pFsm) { @@ -597,7 +613,7 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) { static void vnodeBecomeLeader(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; if (pVnode->pTq) { - tqUpdateNodeStage(pVnode->pTq); + tqUpdateNodeStage(pVnode->pTq, true); } vDebug("vgId:%d, become leader", pVnode->config.vgId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 55d9a46b11..9fa9a664b9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -140,6 +140,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } + pMeta->pUpdateTaskList = taosHashInit(64, fp, false, HASH_NO_LOCK); + if (pMeta->pUpdateTaskList == NULL) { + goto _err; + } + // task list pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); if (pMeta->pTaskList == NULL) { @@ -316,6 +321,7 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasks); taosHashCleanup(pMeta->pTaskBackendUnique); + taosHashCleanup(pMeta->pUpdateTaskList); taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex); @@ -758,9 +764,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { return 0; } -static bool readyToSendHb(SMetaHbInfo* pInfo) { - if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { - // reset the counter +static bool enoughTimeDuration(SMetaHbInfo* pInfo) { + if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter pInfo->tickCounter = 0; return true; } @@ -784,7 +789,14 @@ void metaHbToMnode(void* param, void* tmrId) { return; } - if (!readyToSendHb(&pMeta->hbInfo)) { + // not leader not send msg + if (!pMeta->leader) { + qInfo("vgId:%d follower not send hb to mnode", pMeta->vgId); + taosReleaseRef(streamMetaId, rid); + return; + } + + if (!enoughTimeDuration(&pMeta->hbInfo)) { taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosReleaseRef(streamMetaId, rid); return; @@ -907,10 +919,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { taosWUnLockLatch(&pMeta->lock); // wait for the stream meta hb function stopping - pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP; - while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { - taosMsleep(100); - qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); + if (pMeta->leader) { + pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP; + while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { + taosMsleep(100); + qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); + } } qDebug("vgId:%d start to check all tasks", vgId); @@ -924,3 +938,5 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { int64_t el = taosGetTimestampMs() - st; qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); } + +void streamMetaStartHb(SStreamMeta* pMeta) { metaHbToMnode(pMeta, NULL); }