fix(stream): check error.

This commit is contained in:
Haojun Liao 2023-08-14 19:27:09 +08:00
parent 5d3232d275
commit 1a08ffc79f
1 changed files with 8 additions and 4 deletions

View File

@ -730,10 +730,14 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast.
// todo handle the shuffle dispatch failure
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
qWarn("s-task:%s failed to dispatch msg to task:0x%x, no retry, since it is destroyed already", id);
} else {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
}
}
return TSDB_CODE_SUCCESS;