From 2dade996bbe44c4f6ed8c56fe7dfec479857a0cc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 15:37:12 +0800 Subject: [PATCH] log(stream): update logs. --- source/libs/stream/src/streamRecover.c | 46 ++------------------------ 1 file changed, 2 insertions(+), 44 deletions(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index d074e115f7..03a5233023 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -32,7 +32,6 @@ typedef struct STaskRecheckInfo { static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); -static int32_t getNextRetryInterval(int32_t waitInterval); static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); static void tryLaunchHistoryTask(void* param, void* tmrId); @@ -453,10 +452,6 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8 return 0; } -int32_t getNextRetryInterval(int32_t waitInterval) { - return waitInterval * RETRY_LAUNCH_INTERVAL_INC_RATE; -} - int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pTranstate == NULL) { @@ -602,43 +597,6 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) doCheckDownstreamStatus(pHTask); } -static bool doLaunchHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo) { - SStreamMeta* pMeta = pTask->pMeta; - streamTaskSetRetryInfoForLaunch(&pTask->hTaskInfo); - - stDebug("s-task:%s try launch related fill-history task in timer, retry:%d", pTask->id.idStr, - pTask->hTaskInfo.retryTimes); - - ASSERT(pTask->status.timerActive >= 1); - - // abort the timer if intend to stop task - SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); - if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { - const char* p = streamGetTaskStatusStr(pTask->status.taskStatus); - stWarn( - "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have " - "been destroyed, or should stop", - pTask->id.idStr, pMeta->vgId, streamGetTaskStatusStr(pTask->status.taskStatus), (int32_t)pTask->hTaskInfo.id.taskId); - - taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer); - streamMetaReleaseTask(pMeta, pTask); - return true; - } - - if (pHTask != NULL) { - checkFillhistoryTaskStatus(pTask, pHTask); - streamMetaReleaseTask(pMeta, pHTask); - } - - // not in timer anymore - int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, - pTask->hTaskInfo.retryTimes, ref); - streamMetaReleaseTask(pMeta, pTask); - - return false; -} - static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; @@ -678,8 +636,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); streamMetaReleaseTask(pMeta, pTask); - stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task timer, ref:%d", - pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, ref); + stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", + pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); pHTaskInfo->id.taskId = 0; pHTaskInfo->id.streamId = 0;