fix(stream): transfer state when all downstream rsp already.

This commit is contained in:
Haojun Liao 2023-09-25 17:31:38 +08:00
parent c615c513e8
commit c9576d7a85
2 changed files with 16 additions and 14 deletions

View File

@ -2247,6 +2247,7 @@ static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
execNodeList.pNodeEntryList = pValidNodeEntryList; execNodeList.pNodeEntryList = pValidNodeEntryList;
taosArrayDestroy(pRemoveTaskList);
return 0; return 0;
} }

View File

@ -1111,20 +1111,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId); pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} }
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
return TSDB_CODE_SUCCESS;
}
} }
int32_t leftRsp = 0; int32_t leftRsp = 0;
@ -1168,6 +1154,21 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // this message has been sent successfully, let's try next one. } else { // this message has been sent successfully, let's try next one.
pTask->msgInfo.retryCount = 0; pTask->msgInfo.retryCount = 0;
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
return TSDB_CODE_SUCCESS;
}
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
} }
} }