fix(stream): fix the race condition in dispatching data.
This commit is contained in:
parent
60fa7cccbf
commit
fee9d751d4
|
@ -305,6 +305,7 @@ typedef struct SCheckpointInfo {
|
||||||
int64_t processedVer; // already processed ver, that has generated results version.
|
int64_t processedVer; // already processed ver, that has generated results version.
|
||||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||||
int64_t failedId; // record the latest failed checkpoint id
|
int64_t failedId; // record the latest failed checkpoint id
|
||||||
|
bool dispatchCheckpointTrigger;
|
||||||
} SCheckpointInfo;
|
} SCheckpointInfo;
|
||||||
|
|
||||||
typedef struct SStreamStatus {
|
typedef struct SStreamStatus {
|
||||||
|
|
|
@ -158,6 +158,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
ASSERT(pTask->chkInfo.dispatchCheckpointTrigger == false);
|
||||||
streamDispatchStreamBlock(pTask);
|
streamDispatchStreamBlock(pTask);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
|
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
|
@ -278,6 +279,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) {
|
||||||
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
||||||
pTask->checkpointNotReadyTasks = 0;
|
pTask->checkpointNotReadyTasks = 0;
|
||||||
pTask->checkpointAlignCnt = 0;
|
pTask->checkpointAlignCnt = 0;
|
||||||
|
pTask->chkInfo.dispatchCheckpointTrigger = false;
|
||||||
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -593,6 +593,12 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->chkInfo.dispatchCheckpointTrigger) {
|
||||||
|
stDebug("s-task:%s already send checkpoint trigger, not dispatch anymore", id);
|
||||||
|
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(pTask->msgInfo.pData == NULL);
|
ASSERT(pTask->msgInfo.pData == NULL);
|
||||||
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status);
|
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status);
|
||||||
|
|
||||||
|
@ -1039,30 +1045,14 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dispatchDataInFuture(void* param, void* tmrId) {
|
|
||||||
SStreamTask* pTask = param;
|
|
||||||
if (streamTaskShouldStop(pTask)) {
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
|
||||||
if (status == TASK_STATUS__CK) {
|
|
||||||
stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr);
|
|
||||||
taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
|
|
||||||
} else {
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref);
|
|
||||||
streamDispatchStreamBlock(pTask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||||
|
|
||||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
|
if (delayDispatch) {
|
||||||
|
pTask->chkInfo.dispatchCheckpointTrigger = true;
|
||||||
|
}
|
||||||
|
|
||||||
pTask->msgInfo.pData = NULL;
|
pTask->msgInfo.pData = NULL;
|
||||||
pTask->msgInfo.dispatchMsgType = 0;
|
pTask->msgInfo.dispatchMsgType = 0;
|
||||||
|
@ -1083,13 +1073,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
||||||
|
|
||||||
// otherwise, continue dispatch the first block to down stream task in pipeline
|
// otherwise, continue dispatch the first block to down stream task in pipeline
|
||||||
if (delayDispatch) {
|
if (delayDispatch) {
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
return 0;
|
||||||
stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref);
|
|
||||||
if (pTask->msgInfo.pTimer != NULL) {
|
|
||||||
taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
|
|
||||||
} else {
|
|
||||||
pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
streamDispatchStreamBlock(pTask);
|
streamDispatchStreamBlock(pTask);
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkpoint trigger will be checked
|
||||||
streamDispatchStreamBlock(pTask);
|
streamDispatchStreamBlock(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue