fix(stream): disable retrieving results during checkpoint procedure
This commit is contained in:
parent
af4618f958
commit
7d39164133
|
@ -67,27 +67,32 @@ static void streamSchedByTimer(void* param, void* tmrId) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
|
||||
if (pTrigger == NULL) {
|
||||
return;
|
||||
}
|
||||
if (pTask->status.taskStatus == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", pTask->id.idStr, (int32_t) pTask->info.triggerParam);
|
||||
} else {
|
||||
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
|
||||
if (pTrigger == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
pTrigger->type = STREAM_INPUT__GET_RES;
|
||||
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
if (pTrigger->pBlock == NULL) {
|
||||
taosFreeQitem(pTrigger);
|
||||
return;
|
||||
}
|
||||
pTrigger->type = STREAM_INPUT__GET_RES;
|
||||
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
if (pTrigger->pBlock == NULL) {
|
||||
taosFreeQitem(pTrigger);
|
||||
return;
|
||||
}
|
||||
|
||||
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
|
||||
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
||||
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) {
|
||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
|
||||
return;
|
||||
}
|
||||
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
|
||||
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
||||
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) {
|
||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer,
|
||||
&pTask->schedInfo.pTimer);
|
||||
return;
|
||||
}
|
||||
|
||||
streamSchedExec(pTask);
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
}
|
||||
|
||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
|
||||
|
|
Loading…
Reference in New Issue