fix(stream): fix bug in delay send.
This commit is contained in:
parent
957ce07dc4
commit
466c80e370
|
@ -1061,7 +1061,11 @@ static void dispatchDataInFuture(void* param, void* tmrId) {
|
|||
// this message has been sent successfully, let's try next one.
|
||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||
|
||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||
|
||||
pTask->msgInfo.pData = NULL;
|
||||
pTask->msgInfo.dispatchMsgType = 0;
|
||||
|
||||
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
|
||||
|
||||
|
@ -1078,8 +1082,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
|||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
|
||||
// otherwise, continue dispatch the first block to down stream task in pipeline
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||
if (status == TASK_STATUS__CK) {
|
||||
if (delayDispatch) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref);
|
||||
if (pTask->msgInfo.pTimer != NULL) {
|
||||
|
|
Loading…
Reference in New Issue