diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index be7e6f1fdf..23b0553a3f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2111,11 +2111,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); if (p == NULL && p1 == NULL) { + mDebug("stream:0x%"PRIx64" %s not involved nodeUpdate, ignore", pStream->uid, pStream->name); mndReleaseStream(pMnode, pStream); continue; } - mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid); + mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans", pStream->uid, pStream->name); int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); if (code != TSDB_CODE_SUCCESS) { sdbCancelFetch(pSdb, pIter); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index cd15595411..061efc94fb 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -301,6 +301,8 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { if (status == TASK_STATUS__SCAN_HISTORY) { stDebug("s-task:%s enter into scan-history data stage, status:%s", id, str); streamTaskStartScanHistory(pTask); + // start the related fill-history task, when current task is ready + streamLaunchFillHistoryTask(pTask); } else { // fill-history tasks are not allowed to reach here. if (pTask->info.fillHistory == 1) { @@ -312,9 +314,6 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { streamTaskEnablePause(pTask); } } - - // when current stream task is ready, check the related fill history task. - streamLaunchFillHistoryTask(pTask); } int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { @@ -370,14 +369,15 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs "s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, not send check again, " "roll-back needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); - } else if (pRsp->status == TASK_SELF_NEW_STAGE) { - stError( - "s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, not send check " - "again, roll-back needed", - id, pRsp->oldStage, (int32_t)pTask->pMeta->stage); } else { - STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); + if (pRsp->status == TASK_SELF_NEW_STAGE) { + stError( + "s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, continue check " + "till downstream nodeUpdate", + id, pRsp->oldStage, (int32_t)pTask->pMeta->stage); + } + STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);