fix(stream): retry when error occurs during timer.
This commit is contained in:
parent
7d39164133
commit
a25d94ab6f
|
@ -58,21 +58,26 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
||||||
|
|
||||||
static void streamSchedByTimer(void* param, void* tmrId) {
|
static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = (void*)param;
|
SStreamTask* pTask = (void*)param;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
|
||||||
|
|
||||||
int8_t status = atomic_load_8(&pTask->schedInfo.status);
|
int8_t status = atomic_load_8(&pTask->schedInfo.status);
|
||||||
stDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->info.triggerParam);
|
stDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
|
||||||
|
|
||||||
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
||||||
stDebug("s-task:%s jump out of schedTimer", pTask->id.idStr);
|
stDebug("s-task:%s jump out of schedTimer", id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__CK) {
|
if (pTask->status.taskStatus == TASK_STATUS__CK) {
|
||||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", pTask->id.idStr, (int32_t) pTask->info.triggerParam);
|
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
|
||||||
} else {
|
} else {
|
||||||
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||||
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
|
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
|
||||||
if (pTrigger == NULL) {
|
if (pTrigger == NULL) {
|
||||||
|
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
|
||||||
|
nextTrigger);
|
||||||
|
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,14 +85,19 @@ static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
if (pTrigger->pBlock == NULL) {
|
if (pTrigger->pBlock == NULL) {
|
||||||
taosFreeQitem(pTrigger);
|
taosFreeQitem(pTrigger);
|
||||||
|
|
||||||
|
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
|
||||||
|
nextTrigger);
|
||||||
|
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
|
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
|
||||||
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
||||||
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) {
|
|
||||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer,
|
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
|
||||||
&pTask->schedInfo.pTimer);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,7 +105,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
|
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||||
|
|
Loading…
Reference in New Issue