From 437eb93a4d77993707508eb07d057bdce71e3286 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 26 Jun 2023 18:44:59 +0800 Subject: [PATCH] fix(stream): fix error while fill history exists. --- source/dnode/mnode/impl/src/mndScheduler.c | 6 ++++-- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamRecover.c | 20 +++++++++++++------- source/libs/stream/src/streamTask.c | 6 +----- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 36ed013549..33905bad86 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -554,13 +554,15 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl sdbRelease(pSdb, pVgroup); return code; } - - setHTasksId(pSourceTaskList, pHSourceTaskList); } sdbRelease(pSdb, pVgroup); } + if (pStream->conf.fillHistory) { + setHTasksId(pSourceTaskList, pHSourceTaskList); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cfbfb40ee6..4768f5aed3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -343,7 +343,7 @@ static void waitForTaskTobeIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { double el = (taosGetTimestampMs() - st) / 1000.0; if (el > 0) { - qDebug("s-task:%s wait for stream task:%s for %.2fs to execute all data in inputQ", pTask->id.idStr, + qDebug("s-task:%s wait for stream task:%s for %.2fs to handle all data in inputQ", pTask->id.idStr, pStreamTask->id.idStr, el); } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 575cb7105d..9f2d6b5908 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -479,22 +479,28 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // abort the timer if intend to stop task SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); - if (pHTask == NULL && pTask->status.taskStatus == TASK_STATUS__NORMAL) { - qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it may not be built or have been destroyed", - pTask->id.idStr, pMeta->vgId, pTask->historyTaskId.taskId); + if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + qWarn( + "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or have been " + "destroyed, or should stop exec", + pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer); + streamMetaReleaseTask(pMeta, pTask); return; } - doCheckDownstreamStatus(pTask, pHTask); + if (pHTask != NULL) { + doCheckDownstreamStatus(pTask, pHTask); + streamMetaReleaseTask(pMeta, pHTask); + } // not in timer anymore pTask->status.timerActive = 0; - streamMetaReleaseTask(pMeta, pHTask); streamMetaReleaseTask(pMeta, pTask); } else { - qError("s-task:0x%x failed to load task", pInfo->taskId); + qError("s-task:0x%x failed to load task, it may have been destoryed", pInfo->taskId); } taosMemoryFree(pInfo); @@ -664,7 +670,7 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { // launch current task SHistDataRange* pRange = &pTask->dataRange; - int64_t ekey = pRange->window.ekey; + int64_t ekey = pRange->window.ekey + 1; int64_t ver = pRange->range.minVer; pRange->window.skey = ekey; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d6f23bb44a..06da72188c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -42,11 +42,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; - if (fillHistory) { - pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; - } else { - pTask->status.taskStatus = TASK_STATUS__NORMAL; - } + pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;