From a25d94ab6f76e806262ceae4c8ced8570051c27a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 Oct 2023 16:40:05 +0800 Subject: [PATCH] fix(stream): retry when error occurs during timer. --- source/libs/stream/src/stream.c | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b7d57b1728..03ba796b2c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -58,21 +58,26 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { static void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; + const char* id = pTask->id.idStr; + int32_t nextTrigger = (int32_t)pTask->info.triggerParam; int8_t status = atomic_load_8(&pTask->schedInfo.status); - stDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->info.triggerParam); + stDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - stDebug("s-task:%s jump out of schedTimer", pTask->id.idStr); + stDebug("s-task:%s jump out of schedTimer", id); return; } if (pTask->status.taskStatus == TASK_STATUS__CK) { - stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", pTask->id.idStr, (int32_t) pTask->info.triggerParam); + stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); } else { if (status == TASK_TRIGGER_STATUS__ACTIVE) { SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); if (pTrigger == NULL) { + stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", + nextTrigger); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); return; } @@ -80,14 +85,19 @@ static void streamSchedByTimer(void* param, void* tmrId) { pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pTrigger->pBlock == NULL) { taosFreeQitem(pTrigger); + + stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", + nextTrigger); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); return; } atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); pTrigger->pBlock->info.type = STREAM_GET_ALL; - if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) { - taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, - &pTask->schedInfo.pTimer); + + int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); + if (code != TSDB_CODE_SUCCESS) { + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); return; } @@ -95,7 +105,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { } } - taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); } int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {