diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ae4bc5366d..8be41e8b39 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -264,7 +264,6 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { taosWLockLatch(&pMeta->lock); - int64_t keys[2]; for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { STaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); @@ -305,6 +304,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { return TSDB_CODE_SUCCESS; } +// todo: handle the case: during the checkpoint procedure, leader/follower changes happened. int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t code = 0; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 8baf411c83..5c29cb8ad2 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -947,20 +947,56 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } +// this message has been sent successfully, let's try next one. +static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; + + if (pTask->msgInfo.blockingTs != 0) { + int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; + stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", + pTask->id.idStr, downstreamId, el); + pTask->msgInfo.blockingTs = 0; + + // put data into inputQ of current task is also allowed + pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + } + + // now ready for next data output + atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); + + // otherwise, continue dispatch the first block to down stream task in pipeline + streamDispatchStreamBlock(pTask); + return 0; +} + int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { const char* id = pTask->id.idStr; if (code != TSDB_CODE_SUCCESS) { // dispatch message failed: network error, or node not available. - // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set + // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure // happened too fast. // todo handle the shuffle dispatch failure if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore - stWarn("s-task:%s failed to dispatch msg to task:0x%x, no retry, since it is destroyed already", id, pRsp->downstreamTaskId); + stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), no retry, since it is destroyed already", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + {// we should set the correct finish flag to make sure the shuffle dispatch will be executed completed. + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + int32_t left = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + if (left > 0) { // do nothing + stError("s-task:%s add the shuffle dispatch counter to complete the dispatch process", id); + } else { + stError("s-task:%s the last rsp is failed, ignore it and continue, roll-back will discard this msg", id); + handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); + } + } + } } else { - stError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId, - tstrerror(code), ++pTask->msgInfo.retryCount); + stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), code:%s, retry cnt:%d", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code), ++pTask->msgInfo.retryCount); + int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (ret != TSDB_CODE_SUCCESS) { } @@ -969,16 +1005,20 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return TSDB_CODE_SUCCESS; } - stDebug("s-task:%s recv dispatch rsp from 0x%x, downstream task input status:%d code:%d", id, pRsp->downstreamTaskId, - pRsp->inputStatus, code); - // there are other dispatch message not response yet if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - stDebug("s-task:%s is shuffle, left waiting rsp %d", id, leftRsp); if (leftRsp > 0) { + stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", id, pRsp->downstreamTaskId, + pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp); return 0; + } else { + stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); } + } else { + stDebug("s-task:%s recv fix-dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); } // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state @@ -986,6 +1026,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (p->type == STREAM_INPUT__TRANS_STATE) { stDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id); ASSERT(pTask->info.fillHistory == 1); + code = streamTransferStateToStreamTask(pTask); if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } @@ -1004,6 +1045,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // so the TASK_INPUT_STATUS_BLOCKED is rsp if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream + double el = 0; if (pTask->msgInfo.blockingTs == 0) { pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time @@ -1018,24 +1060,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. - destroyStreamDataBlock(pTask->msgInfo.pData); - pTask->msgInfo.pData = NULL; - - if (pTask->msgInfo.blockingTs != 0) { - int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; - stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", id, - pRsp->downstreamTaskId, el); - pTask->msgInfo.blockingTs = 0; - - // put data into inputQ of current task is also allowed - pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; - } - - // now ready for next data output - atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); - - // otherwise, continue dispatch the first block to down stream task in pipeline - streamDispatchStreamBlock(pTask); + handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); } return 0;