diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e586b1a028..8ed76d3918 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -62,15 +62,12 @@ enum { enum { TASK_INPUT_STATUS__NORMAL = 1, TASK_INPUT_STATUS__BLOCKED, - TASK_INPUT_STATUS__RECOVER, - TASK_INPUT_STATUS__STOP, TASK_INPUT_STATUS__FAILED, }; enum { TASK_OUTPUT_STATUS__NORMAL = 1, TASK_OUTPUT_STATUS__WAIT, - TASK_OUTPUT_STATUS__BLOCKED, }; enum { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 848468d3e0..f054c07feb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -761,17 +761,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpoi pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; - // checkpoint exists, restore from the last checkpoint - // if (pTask->chkInfo.checkpointId != 0) { - // ASSERT(pTask->chkInfo.checkpointVer > 0); - // pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer; - // pTask->dataRange.range.maxVer = pTask->chkInfo.checkpointVer; - // pTask->dataRange.range.minVer = pTask->chkInfo.checkpointVer; - // } else { pTask->chkInfo.currentVer = ver; pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.minVer = ver; - //} if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SStreamTask* pSateTask = pTask; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 378217863a..9eae7c66e7 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -240,8 +240,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t status = pTask->status.taskStatus; + const char* pStatus = streamGetTaskStatusStr(status); if (status != TASK_STATUS__NORMAL) { - tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); + tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, pStatus); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -252,6 +253,12 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } + if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + *pScanIdle = false; // seek the stored version and extract data from WAL diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4ae7ea02b3..e90e974318 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -244,10 +244,10 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId)); - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); - p->chkInfo.checkpointId = p->checkpointingId; - int8_t prev = p->status.taskStatus; + ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); + + p->chkInfo.checkpointId = p->checkpointingId; p->status.taskStatus = TASK_STATUS__NORMAL; // save the task diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9790762c0e..e3839470ff 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -794,13 +794,13 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, return 0; } -// todo record the idle time for dispatch data int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { if (code != TSDB_CODE_SUCCESS) { // dispatch message failed: network error, or node not available. // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure - // happened too fast. todo handle the shuffle dispatch failure + // happened too fast. + // todo handle the shuffle dispatch failure qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); @@ -810,7 +810,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return TSDB_CODE_SUCCESS; } - qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); + qDebug("s-task:%s recv dispatch rsp, downstream task input status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, + code); // there are other dispatch message not response yet if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -828,14 +829,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // the input queue of the (down stream) task that receive the output data is full, // so the TASK_INPUT_STATUS_BLOCKED is rsp - // todo blocking the output status if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - - int32_t waitDuration = 300; // 300 ms qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", - pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration); - streamRetryDispatchStreamBlock(pTask, waitDuration); + pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. destroyStreamDataBlock(pTask->msgInfo.pData); @@ -843,8 +842,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pTask->msgInfo.blockingTs != 0) { int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; - qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%" PRId64 "ms", pTask->id.idStr, el); + qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", + pTask->id.idStr, pRsp->downstreamTaskId, el); pTask->msgInfo.blockingTs = 0; + + // put data into inputQ of current task is also allowed + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; } // now ready for next data output diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a07b775cb8..0e1835c492 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -512,7 +512,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { } // blocked by downstream task - if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) { + if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { return false; }