fix(stream): remove the blocking flag after successfully retry sending msg.

This commit is contained in:
Haojun Liao 2023-09-22 16:44:55 +08:00
parent 6c357104a5
commit 5a695f632c
1 changed files with 11 additions and 15 deletions

View File

@ -496,8 +496,9 @@ static void doRetryDispatchData(void* param, void* tmrId) {
}
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d", pTask->id.idStr, waitDuration,
pTask->taskExecInfo.dispatchCount);
pTask->msgInfo.retryCount++;
stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, waitDuration,
pTask->taskExecInfo.dispatchCount, pTask->msgInfo.retryCount);
if (pTask->launchTaskTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
@ -606,6 +607,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
int32_t retryCount = 0;
pTask->taskExecInfo.dispatchCount += 1;
pTask->msgInfo.startTs = taosGetTimestampMs();
int32_t code = doBuildDispatchMsg(pTask, pBlock);
if (code == 0) {
@ -1038,17 +1040,12 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.startTs != 0) {
int64_t now = taosGetTimestampMs();
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
int64_t el = now - pTask->msgInfo.startTs;
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
pTask->msgInfo.startTs = now;
// put data into inputQ of current task is also allowed
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
}
// put data into inputQ of current task is also allowed
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
@ -1137,10 +1134,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, numOfFailed);
}
pTask->msgInfo.retryCount++;
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, pTask->msgInfo.retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // this message has been sent successfully, let's try next one.