From 98758862cf42d403b9f8bcc29729fc594d9acbcf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 Jan 2024 00:17:29 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamStart.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index c8879e59c7..53c40e5b5c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -866,7 +866,7 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, in return pInfo; } -static int32_t handleNotBuiltFillHistoryTask(SStreamTask* pTask) { +static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; STaskExecStatisInfo* pExecInfo = &pTask->execInfo; const char* idStr = pTask->id.idStr; @@ -894,8 +894,7 @@ static int32_t handleNotBuiltFillHistoryTask(SStreamTask* pTask) { if (pTask->hTaskInfo.pTimer == NULL) { ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, - pTask->status.timerActive); + stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref); taosMemoryFree(pInfo); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); @@ -940,10 +939,10 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); streamMetaRUnLock(pMeta); - if (pHTask != NULL) { + if (pHTask != NULL) { // it is already added into stream meta store. SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); if (pHisTask == NULL) { - stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped", idStr); + stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); } else { if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing @@ -957,9 +956,9 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } return TSDB_CODE_SUCCESS; + } else { + return launchNotBuiltFillHistoryTask(pTask); } - - return handleNotBuiltFillHistoryTask(pTask); } int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {