diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 3054179416..d363031db1 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -211,12 +211,13 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { const char* id = pTask->id.idStr; + int64_t maxVer = pTask->dataRange.range.maxVer; if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { if (!pTask->status.appendTranstateBlock) { qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal anymore, add transfer-state block into inputQ", - id, ver, pTask->dataRange.range.maxVer); + id, ver, maxVer); double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); @@ -224,7 +225,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { /*int32_t code = */streamSchedExec(pTask); } else { qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", - id, ver, pTask->dataRange.range.maxVer); + id, ver, maxVer); } } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 06861454d1..557b92baf9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -757,10 +757,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id); ASSERT(pTask->info.fillHistory == 1); code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens -// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); } + + streamFreeQitem(pTask->msgInfo.pData); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ccfa331661..cebae0801d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -534,16 +534,21 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } else { streamFreeQitem((SStreamQueueItem*)pBlock); } + } else { // level == TASK_LEVEL__SINK + streamFreeQitem((SStreamQueueItem*)pBlock); } } else { // non-dispatch task, do task state transfer directly - qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); - streamFreeQitem((SStreamQueueItem*)pBlock); - ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + if (level != TASK_LEVEL__SINK) { + qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); + ASSERT(pTask->info.fillHistory == 1); + code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + if (code != TSDB_CODE_SUCCESS) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + } + } else { + qDebug("s-task:%d sink task does not transfer state", id); } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index c3d4d4c7ae..0a1a15259c 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -400,7 +400,6 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { } pTask->status.appendTranstateBlock = true; - qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus); return TSDB_CODE_SUCCESS; }