Merge pull request #23727 from taosdata/fix/pause_stream

fix(stream): wait for task to be normal and then send data block.
This commit is contained in:
Haojun Liao 2023-11-16 19:51:33 +08:00 committed by GitHub
commit 3100edae5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 1 deletions

View File

@ -1039,10 +1039,33 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
return 0;
}
static void dispatchDataInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param;
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
return;
}
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr);
taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref);
streamDispatchStreamBlock(pTask);
}
}
// this message has been sent successfully, let's try next one.
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
pTask->msgInfo.pData = NULL;
pTask->msgInfo.dispatchMsgType = 0;
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
@ -1059,7 +1082,18 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline
if (delayDispatch) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref);
if (pTask->msgInfo.pTimer != NULL) {
taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
} else {
pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer);
}
} else {
streamDispatchStreamBlock(pTask);
}
return 0;
}