fix(stream): disable retrieving results during checkpoint procedure

This commit is contained in:
Haojun Liao 2023-10-09 16:33:28 +08:00
parent 8266c4ff77
commit be0a2e4573
1 changed files with 23 additions and 18 deletions

View File

@ -67,27 +67,32 @@ static void streamSchedByTimer(void* param, void* tmrId) {
return; return;
} }
if (status == TASK_TRIGGER_STATUS__ACTIVE) { if (pTask->status.taskStatus == TASK_STATUS__CK) {
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", pTask->id.idStr, (int32_t) pTask->info.triggerParam);
if (pTrigger == NULL) { } else {
return; if (status == TASK_TRIGGER_STATUS__ACTIVE) {
} SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
if (pTrigger == NULL) {
return;
}
pTrigger->type = STREAM_INPUT__GET_RES; pTrigger->type = STREAM_INPUT__GET_RES;
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pTrigger->pBlock == NULL) { if (pTrigger->pBlock == NULL) {
taosFreeQitem(pTrigger); taosFreeQitem(pTrigger);
return; return;
} }
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
pTrigger->pBlock->info.type = STREAM_GET_ALL; pTrigger->pBlock->info.type = STREAM_GET_ALL;
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) { if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) {
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer,
return; &pTask->schedInfo.pTimer);
} return;
}
streamSchedExec(pTask); streamSchedExec(pTask);
}
} }
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);