fix(stream):fix error in timerActive.
This commit is contained in:
parent
037a232bfa
commit
d8518de5e0
|
@ -391,8 +391,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
SStreamTask* pTask = param;
|
||||
|
||||
if (streamTaskShouldStop(&pTask->status)) {
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -409,17 +409,22 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
}
|
||||
} else {
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||
int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||
}
|
||||
} else {
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
|
||||
}
|
||||
}
|
||||
|
||||
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
|
||||
qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration);
|
||||
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
|
||||
qWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration);
|
||||
if (pTask->launchTaskTimer != NULL) {
|
||||
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
|
||||
} else {
|
||||
pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
||||
|
@ -540,8 +545,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
||||
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
|
||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
|
||||
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||
|
||||
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
break;
|
||||
}
|
||||
|
@ -997,8 +1004,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
||||
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
|
||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
|
||||
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d",
|
||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else { // pipeline send data in output queue
|
||||
// this message has been sent successfully, let's try next one.
|
||||
|
|
Loading…
Reference in New Issue