diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 82fefc7e1c..88ba7f995a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -450,8 +450,8 @@ typedef struct SStreamMeta { int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); -SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, - SArray* pTaskList); +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam, + SArray* pTaskList, bool hasFillhistory); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 62d5ff47e3..a13c6371b1 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -27,8 +27,8 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; -static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, - SVgObj* pVgroup, SEpSet* pEpset, int32_t fillHistory); +static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, + SEpSet* pEpset, bool isFillhistory); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { @@ -207,8 +207,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { } // create sink node for each vgroup. -int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, SEpSet* pEpset, - int32_t fillHistory) { +int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, SEpSet* pEpset, bool fillHistory) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; @@ -224,17 +223,17 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea continue; } - mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, pEpset, fillHistory); + doAddSinkTask(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, pEpset, fillHistory); sdbRelease(pSdb, pVgroup); } return 0; } -int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, - SEpSet* pEpset, int32_t fillHistory) { - int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid; - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); +int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, + SEpSet* pEpset, bool isFillhistory) { + int64_t uid = (isFillhistory)? pStream->uid:pStream->hTaskUid; + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, pTaskList, pStream->conf.fillHistory); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -248,17 +247,16 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p return 0; } -static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, - SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, - int8_t fillHistory, bool hasExtraSink, int64_t firstWindowSkey) { - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList); +static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, + SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, bool fillHistory, + bool hasExtraSink, int64_t firstWindowSkey, bool hasFillHistory) { + SStreamTask* pTask = + tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillHistory); if (pTask == NULL) { return terrno; } epsetAssign(&pTask->info.mnodeEpset, pEpset); - - // todo set the correct ts, which should be last key of queried table. STimeWindow* pWindow = &pTask->dataRange.window; pWindow->skey = INT64_MIN; @@ -345,8 +343,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* // new stream task SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); - int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset, - 0, hasExtraSink, nextWindowSkey); + int32_t code = addSourceTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset, + false, hasExtraSink, nextWindowSkey, pStream->conf.fillHistory); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return -1; @@ -354,8 +352,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* if (pStream->conf.fillHistory) { SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); - code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, - pEpset, 1, hasExtraSink, nextWindowSkey); + code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, + pEpset, true, hasExtraSink, nextWindowSkey, false); } sdbRelease(pSdb, pVgroup); @@ -371,10 +369,10 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* return TSDB_CODE_SUCCESS; } -static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask, +static int32_t doAddSourceTask(SArray* pTaskList, bool isFillhistory, int64_t uid, SStreamTask* pDownstreamTask, SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, SEpSet* pEpset, - int64_t nextWindowSkey) { - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList); + int64_t nextWindowSkey, bool hasFillHistory) { + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, isFillhistory, 0, pTaskList, hasFillHistory); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -400,8 +398,8 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui } static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, - SEpSet* pEpset, int32_t fillHistory, SStreamTask** pAggTask) { - *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList); + SEpSet* pEpset, bool fillHistory, SStreamTask** pAggTask, bool hasFillhistory) { + *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillhistory); if (*pAggTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -432,7 +430,8 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan *pAggTask = NULL; SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); - int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, pEpset, 0, pAggTask); + int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, pEpset, false, pAggTask, + pStream->conf.fillHistory); if (code != TSDB_CODE_SUCCESS) { return -1; } @@ -461,7 +460,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan *pHAggTask = NULL; code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pEpset, pStream->conf.fillHistory, - pHAggTask); + pHAggTask, false); if (code != TSDB_CODE_SUCCESS) { if (pSnode != NULL) { sdbRelease(pSdb, pSnode); @@ -520,8 +519,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl continue; } - int32_t code = - doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset, nextWindowSkey); + int32_t code = doAddSourceTask(pSourceTaskList, false, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset, + nextWindowSkey, pStream->conf.fillHistory); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); terrno = code; @@ -529,8 +528,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl } if (pStream->conf.fillHistory) { - code = doAddSourceTask(pHSourceTaskList, 1, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, - pEpset, nextWindowSkey); + code = doAddSourceTask(pHSourceTaskList, true, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, pEpset, + nextWindowSkey, false); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return code; @@ -548,16 +547,16 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl } static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList, - SEpSet* pEpset, int32_t fillHistory) { + SEpSet* pEpset, bool fillHistory) { SArray* pSinkTaskList = addNewTaskList(pTasksList); if (pStream->fixedSinkVgId == 0) { - if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, pEpset, fillHistory) < 0) { + if (doAddShuffleSinkTask(pMnode, pSinkTaskList, pStream, pEpset, fillHistory) < 0) { // TODO free return -1; } } else { - if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, - pEpset, fillHistory) < 0) { + if (doAddSinkTask(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, pEpset, + fillHistory) < 0) { // TODO free return -1; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 57d14d000b..8cbdeff19a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2320,8 +2320,9 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { } SMStreamNodeCheckMsg *pMsg = rpcMallocCont(sizeof(SMStreamNodeCheckMsg)); - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)}; + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } @@ -2383,7 +2384,6 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -// todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 03a5233023..f31a16ec85 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -66,7 +66,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { pMeta->startInfo.startedAfterNodeUpdate = 0; pMeta->startInfo.elapsedTime = pTask->execInfo.start - pMeta->startInfo.ts; - stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2f sec", + stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2fs", vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pMeta->startInfo.elapsedTime / 1000.0); } taosWUnLockLatch(&pMeta->lock); @@ -580,15 +580,21 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { } static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { - pHTask->dataRange.range.minVer = 0; - // the query version range should be limited to the already processed data - pHTask->dataRange.range.maxVer = pTask->chkInfo.nextProcessVer - 1; + SDataRange* pRange = &pHTask->dataRange; + pRange->range.minVer = 0; + // the query version range should be limited to the already processed data + pRange->range.maxVer = pTask->chkInfo.nextProcessVer - 1; + if (pRange->range.maxVer < pRange->range.minVer) { + pRange->range.maxVer = pRange->range.minVer; + } + + pHTask->execInfo.init = taosGetTimestampMs(); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 - " ver range:%" PRId64 " - %" PRId64, - pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, - pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); + " ver range:%" PRId64 " - %" PRId64", init:%"PRId64, + pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, + pRange->range.minVer, pRange->range.maxVer, pHTask->execInfo.init); } else { stDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c1ffcda8a5..2907923d03 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -27,8 +27,8 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { return 0; } -SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, - SArray* pTaskList) { +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam, + SArray* pTaskList, bool hasFillhistory) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -46,10 +46,14 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; + pTask->status.taskStatus = (fillHistory || hasFillhistory)? TASK_STATUS__SCAN_HISTORY:TASK_STATUS__NORMAL; pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; + if (fillHistory) { + ASSERT(hasFillhistory); + } + addToTaskset(pTaskList, pTask); return pTask; }