diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 23085c53ee..14517a99d3 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark); -int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream); +int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index c2e5716c23..36ed013549 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -250,12 +250,11 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas } // todo set the correct ts, which should be last key of queried table. - pTask->dataRange.window.skey = INT64_MIN; - pTask->dataRange.window.ekey = 1685959190000; // taosGetTimestampMs(); - // pTask->dataRange.window.ekey = firstWindowSkey - 1;//taosGetTimestampMs(); + STimeWindow* pWindow = &pTask->dataRange.window; - mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey, - pTask->dataRange.window.ekey); + pWindow->skey = INT64_MIN; + pWindow->ekey = firstWindowSkey - 1; + mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey); // sink or dispatch if (hasExtraSink) { @@ -334,16 +333,16 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { } static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream, - bool hasExtraSink, int64_t lastTs) { + bool hasExtraSink, int64_t nextWindowSkey) { // create exec stream task, since only one level, the exec task is also the source task SArray* pTaskList = addNewTaskList(pStream->tasks); + SSdb* pSdb = pMnode->pSdb; SArray* pHTaskList = NULL; if (pStream->conf.fillHistory) { pHTaskList = addNewTaskList(pStream->pHTasksList); } - SSdb* pSdb = pMnode->pSdb; SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); if (LIST_LENGTH(inner->pNodeList) != 1) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -372,7 +371,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* // new stream task SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, 0, - hasExtraSink, lastTs); + hasExtraSink, nextWindowSkey); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return -1; @@ -381,8 +380,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* if (pStream->conf.fillHistory) { SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, - pStream->conf.fillHistory, hasExtraSink, lastTs); - setHTasksId(pTaskList, pHTaskList); + 1, hasExtraSink, nextWindowSkey); } sdbRelease(pSdb, pVgroup); @@ -391,11 +389,15 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* } } + if (pStream->conf.fillHistory) { + setHTasksId(pTaskList, pHTaskList); + } + return TSDB_CODE_SUCCESS; } static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask, - SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup) { + SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, int64_t nextWindowSkey) { SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -403,11 +405,12 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui } // todo set the correct ts, which should be last key of queried table. - pTask->dataRange.window.skey = INT64_MIN; - pTask->dataRange.window.ekey = 1685959190000; // taosGetTimestampMs(); + STimeWindow* pWindow = &pTask->dataRange.window; + pWindow->skey = INT64_MIN; + pWindow->ekey = nextWindowSkey - 1; mDebug("s-task:0x%x level:%d set time window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->info.taskLevel, - pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + pWindow->skey, pWindow->ekey); // all the source tasks dispatch result to a single agg node. setFixedDownstreamEpInfo(pTask, pDownstreamTask); @@ -507,7 +510,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan } static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream, - SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask) { + SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask, int64_t nextWindowSkey) { SArray* pSourceTaskList = addNewTaskList(pStream->tasks); SArray* pHSourceTaskList = NULL; @@ -536,7 +539,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl continue; } - int32_t code = doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup); + int32_t code = + doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, nextWindowSkey); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); terrno = code; @@ -544,9 +548,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl } if (pStream->conf.fillHistory) { - code = doAddSourceTask(pHSourceTaskList, pStream->conf.fillHistory, pStream->hTaskUid, pHDownstreamTask, pMnode, - plan, pVgroup); - + code = doAddSourceTask(pHSourceTaskList, 1, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, + nextWindowSkey); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return code; @@ -581,7 +584,7 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr return TSDB_CODE_SUCCESS; } -static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t lastTs) { +static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -612,7 +615,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* // check for fill history if (pStream->conf.fillHistory) { SArray* pHSinkTaskList = NULL; - code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, pStream->conf.fillHistory); + code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, 1); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -633,22 +636,22 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* } // source level - return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask); + return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); } else if (numOfPlanLevel == 1) { - return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink, lastTs); + return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink, nextWindowSkey); } return 0; } -int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { +int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey) { SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - int32_t code = doScheduleStream(pStream, pMnode, pPlan, 0); + int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey); qDestroyQueryPlan(pPlan); return code; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index f0b8ed11be..68b697ca67 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -633,7 +633,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; - if (mndScheduleStream(pMnode, &streamObj) != 0) goto _OVER; + if (mndScheduleStream(pMnode, &streamObj, 1685959190000) != 0) goto _OVER; if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 433f3fe272..f3bb8336ae 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -784,7 +784,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } // schedule stream task for stream obj - if (mndScheduleStream(pMnode, &streamObj) < 0) { + if (mndScheduleStream(pMnode, &streamObj, createStreamReq.lastTs) < 0) { mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1af5948268..cfbfb40ee6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -582,13 +582,21 @@ int32_t streamTryExec(SStreamTask* pTask) { } int32_t streamTaskReleaseState(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - int32_t code = qStreamOperatorReleaseState(exec); - return code; + void* pExecutor = pTask->exec.pExecutor; + if (pExecutor != NULL) { + int32_t code = qStreamOperatorReleaseState(pExecutor); + return code; + } else { + return TSDB_CODE_SUCCESS; + } } int32_t streamTaskReloadState(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - int32_t code = qStreamOperatorReloadState(exec); - return code; + void* pExecutor = pTask->exec.pExecutor; + if (pExecutor != NULL) { + int32_t code = qStreamOperatorReloadState(pExecutor); + return code; + } else { + return TSDB_CODE_SUCCESS; + } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index a033556353..575cb7105d 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -461,9 +461,11 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); if (ppTask) { ASSERT((*ppTask)->status.timerActive == 1); + if (streamTaskShouldStop(&(*ppTask)->status)) { - qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, - streamGetTaskStatusStr((*ppTask)->status.taskStatus)); + const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus); + qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus); + (*ppTask)->status.timerActive = 0; taosWUnLockLatch(&pMeta->lock); return; @@ -494,24 +496,27 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } else { qError("s-task:0x%x failed to load task", pInfo->taskId); } + + taosMemoryFree(pInfo); } // todo fix the bug: 2. race condition // an fill history task needs to be started. int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; + int32_t hTaskId = pTask->historyTaskId.taskId; // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId)); + SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId)); if (pHTask == NULL) { qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, - pMeta->vgId, pTask->historyTaskId.taskId); + pMeta->vgId, hTaskId); + + SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); + pInfo->taskId = pTask->id.taskId; + pInfo->pMeta = pTask->pMeta; if (pTask->timer == NULL) { - SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); - pInfo->taskId = pTask->id.taskId; - pInfo->pMeta = pTask->pMeta; - pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); if (pTask->timer == NULL) { // todo failed to create timer @@ -519,6 +524,10 @@ int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) { pTask->status.timerActive = 1; // timer is active qDebug("s-task:%s set time active flag", pTask->id.idStr); } + } else { // timer exists + pTask->status.timerActive = 1; + qDebug("s-task:%s set time active flag", pTask->id.idStr); + taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer); } // try again in 500ms