fix(stream): wait for task to be normal and then send data block.
This commit is contained in:
parent
1b89d4f404
commit
c0d3c15528
|
@ -1039,6 +1039,25 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dispatchDataInFuture(void* param, void* tmrId) {
|
||||||
|
SStreamTask* pTask = param;
|
||||||
|
if (streamTaskShouldStop(pTask)) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||||
|
if (status == TASK_STATUS__CK) {
|
||||||
|
stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr);
|
||||||
|
taosTmrReset(doRetryDispatchData, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
|
||||||
|
} else {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref);
|
||||||
|
streamDispatchStreamBlock(pTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||||
|
@ -1059,7 +1078,19 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
||||||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
|
|
||||||
// otherwise, continue dispatch the first block to down stream task in pipeline
|
// otherwise, continue dispatch the first block to down stream task in pipeline
|
||||||
streamDispatchStreamBlock(pTask);
|
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||||
|
if (status == TASK_STATUS__CK) {
|
||||||
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref);
|
||||||
|
if (pTask->msgInfo.pTimer != NULL) {
|
||||||
|
taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
|
||||||
|
} else {
|
||||||
|
pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
streamDispatchStreamBlock(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue