diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b7e323a213..f40a6c9338 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -272,6 +272,7 @@ typedef struct SStreamStatus { int8_t schedStatus; int8_t keepTaskStatus; bool transferState; + bool appendTranstateBlock; // has append the transfer state data block already int8_t timerActive; // timer is active int8_t pauseAllowed; // allowed task status to be set to be paused } SStreamStatus; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 3d9a91899c..a217bc2966 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -214,9 +214,11 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal anymore, set the transfer state flag", pTask->id.idStr, ver, pTask->dataRange.range.maxVer); - pTask->status.transferState = true; - - /*int32_t code = */streamSchedExec(pTask); + if (!pTask->status.appendTranstateBlock) { + pTask->status.appendTranstateBlock = true; + appendTranstateIntoInputQ(pTask); + /*int32_t code = */streamSchedExec(pTask); + } } } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index f85ade591c..fa24c01418 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -385,8 +385,9 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { destroyStreamDataBlock((SStreamDataBlock*) pItem); return code; } - } else if (type == STREAM_INPUT__CHECKPOINT) { + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) { taosWriteQitem(pTask->inputQueue->queue, pItem); + qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 102c8805b5..c73868123c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -391,11 +391,11 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskFillHistoryFinished(pTask); streamTaskEndScanWAL(pTask); - code = streamDoTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this - return code; - } - } else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. + code = streamDoTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return code; + } + } else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); if (code != TSDB_CODE_SUCCESS) { // todo handle this return code; @@ -484,9 +484,10 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock pTask->status.transferState = true; } - // dispatch the transtate block to downstream task immediately - if (level == TASK_LEVEL__SOURCE || level == TASK_LEVEL__AGG) { - // pBlock-> = pTask->id.taskId; + // dispatch the tran-state block to downstream task immediately + int32_t type = pTask->outputInfo.type; + if ((level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) && + (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH)) { pBlock->srcVgId = pTask->pMeta->vgId; code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); if (code == 0) { @@ -640,10 +641,10 @@ int32_t streamTryExec(SStreamTask* pTask) { // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by // call this function (streamExecForAll) directly. - code = streamExecForAll(pTask); - if (code < 0) { +// code = streamExecForAll(pTask); +// if (code < 0) { // do nothing - } +// } } atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index b46ded6ca7..708524bf10 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -427,6 +427,8 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { return TSDB_CODE_OUT_OF_MEMORY; } + pTranstate->type = STREAM_INPUT__TRANS_STATE; + pBlock->info.type = STREAM_TRANS_STATE; pBlock->info.rows = 1; pBlock->info.childId = pTask->info.selfChildId; @@ -440,7 +442,10 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { return TSDB_CODE_OUT_OF_MEMORY; } + qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus); + pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; streamSchedExec(pTask); + return TSDB_CODE_SUCCESS; }