fix(stream): ignore the dispatch failure, and set the correct rsp counter.
This commit is contained in:
parent
2efd155adf
commit
4925b67d4a
|
@ -264,7 +264,6 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
int64_t keys[2];
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||||
|
@ -305,6 +304,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo: handle the case: during the checkpoint procedure, leader/follower changes happened.
|
||||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
|
|
@ -947,20 +947,56 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this message has been sent successfully, let's try next one.
|
||||||
|
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||||
|
destroyStreamDataBlock(pTask->msgInfo.pData);
|
||||||
|
pTask->msgInfo.pData = NULL;
|
||||||
|
|
||||||
|
if (pTask->msgInfo.blockingTs != 0) {
|
||||||
|
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
|
||||||
|
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
|
||||||
|
pTask->id.idStr, downstreamId, el);
|
||||||
|
pTask->msgInfo.blockingTs = 0;
|
||||||
|
|
||||||
|
// put data into inputQ of current task is also allowed
|
||||||
|
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// now ready for next data output
|
||||||
|
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
|
|
||||||
|
// otherwise, continue dispatch the first block to down stream task in pipeline
|
||||||
|
streamDispatchStreamBlock(pTask);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// dispatch message failed: network error, or node not available.
|
// dispatch message failed: network error, or node not available.
|
||||||
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
|
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set
|
||||||
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
||||||
// happened too fast.
|
// happened too fast.
|
||||||
// todo handle the shuffle dispatch failure
|
// todo handle the shuffle dispatch failure
|
||||||
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
||||||
stWarn("s-task:%s failed to dispatch msg to task:0x%x, no retry, since it is destroyed already", id, pRsp->downstreamTaskId);
|
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), no retry, since it is destroyed already", id,
|
||||||
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||||
|
{// we should set the correct finish flag to make sure the shuffle dispatch will be executed completed.
|
||||||
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
int32_t left = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||||
|
if (left > 0) { // do nothing
|
||||||
|
stError("s-task:%s add the shuffle dispatch counter to complete the dispatch process", id);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
|
stError("s-task:%s the last rsp is failed, ignore it and continue, roll-back will discard this msg", id);
|
||||||
tstrerror(code), ++pTask->msgInfo.retryCount);
|
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), code:%s, retry cnt:%d", id,
|
||||||
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code), ++pTask->msgInfo.retryCount);
|
||||||
|
|
||||||
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
}
|
}
|
||||||
|
@ -969,16 +1005,20 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s recv dispatch rsp from 0x%x, downstream task input status:%d code:%d", id, pRsp->downstreamTaskId,
|
|
||||||
pRsp->inputStatus, code);
|
|
||||||
|
|
||||||
// there are other dispatch message not response yet
|
// there are other dispatch message not response yet
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||||
stDebug("s-task:%s is shuffle, left waiting rsp %d", id, leftRsp);
|
|
||||||
if (leftRsp > 0) {
|
if (leftRsp > 0) {
|
||||||
|
stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", id, pRsp->downstreamTaskId,
|
||||||
|
pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
|
||||||
return 0;
|
return 0;
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id,
|
||||||
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s recv fix-dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d", id,
|
||||||
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
|
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
|
||||||
|
@ -986,6 +1026,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
if (p->type == STREAM_INPUT__TRANS_STATE) {
|
if (p->type == STREAM_INPUT__TRANS_STATE) {
|
||||||
stDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
|
stDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
|
||||||
ASSERT(pTask->info.fillHistory == 1);
|
ASSERT(pTask->info.fillHistory == 1);
|
||||||
|
|
||||||
code = streamTransferStateToStreamTask(pTask);
|
code = streamTransferStateToStreamTask(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
|
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
|
||||||
}
|
}
|
||||||
|
@ -1004,6 +1045,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
// so the TASK_INPUT_STATUS_BLOCKED is rsp
|
// so the TASK_INPUT_STATUS_BLOCKED is rsp
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
||||||
|
|
||||||
double el = 0;
|
double el = 0;
|
||||||
if (pTask->msgInfo.blockingTs == 0) {
|
if (pTask->msgInfo.blockingTs == 0) {
|
||||||
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||||
|
@ -1018,24 +1060,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
} else { // pipeline send data in output queue
|
} else { // pipeline send data in output queue
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
destroyStreamDataBlock(pTask->msgInfo.pData);
|
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
|
||||||
pTask->msgInfo.pData = NULL;
|
|
||||||
|
|
||||||
if (pTask->msgInfo.blockingTs != 0) {
|
|
||||||
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
|
|
||||||
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", id,
|
|
||||||
pRsp->downstreamTaskId, el);
|
|
||||||
pTask->msgInfo.blockingTs = 0;
|
|
||||||
|
|
||||||
// put data into inputQ of current task is also allowed
|
|
||||||
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// now ready for next data output
|
|
||||||
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
|
|
||||||
|
|
||||||
// otherwise, continue dispatch the first block to down stream task in pipeline
|
|
||||||
streamDispatchStreamBlock(pTask);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue