fix(stream): transfer state when all downstream rsp already.

This commit is contained in:
Haojun Liao 2023-09-25 17:31:38 +08:00
parent a32b56f381
commit 65f66f0986
2 changed files with 16 additions and 14 deletions

View File

@ -2246,6 +2246,7 @@ static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
execNodeList.pNodeEntryList = pValidNodeEntryList;
taosArrayDestroy(pRemoveTaskList);
return 0;
}

View File

@ -1111,20 +1111,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
}
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
return TSDB_CODE_SUCCESS;
}
}
int32_t leftRsp = 0;
@ -1168,6 +1154,21 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // this message has been sent successfully, let's try next one.
pTask->msgInfo.retryCount = 0;
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
return TSDB_CODE_SUCCESS;
}
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
}
}