diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1b3960bdba..213a0e3c34 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -321,7 +321,7 @@ typedef struct { struct SStreamTask { int64_t ver; - SStreamTaskId id; + SStreamTaskId id; SSTaskBasicInfo info; STaskOutputInfo outputInfo; SDispatchMsgInfo msgInfo; @@ -329,8 +329,8 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SHistDataRange dataRange; - SStreamTaskId historyTaskId; - SStreamTaskId streamTaskId; + SStreamTaskId historyTaskId; + SStreamTaskId streamTaskId; int32_t nextCheckId; SArray* checkpointInfo; // SArray STaskTimestamp tsInfo; @@ -654,7 +654,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); // recover and fill history -void streamTaskCheckDownstreamTasks(SStreamTask* pTask); +void streamTaskCheckDownstream(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); @@ -718,7 +718,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); int32_t streamMetaCommit(SStreamMeta* pMeta); -int32_t streamLoadTasks(SStreamMeta* pMeta); +int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); // checkpoint diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index bafceb3f5f..06ef01a3f1 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -182,7 +182,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); - streamTaskCheckDownstreamTasks(pTask); + streamTaskCheckDownstream(pTask); return 0; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 93d4b2163d..cce1d55e63 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -45,8 +45,8 @@ extern "C" { typedef struct STqOffsetStore STqOffsetStore; // tqPush -#define EXTRACT_DATA_FROM_WAL_ID (-1) -#define STREAM_TASK_STATUS_CHECK_ID (-2) +#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) +#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2) // tqExec typedef struct { @@ -163,8 +163,9 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); -int32_t tqStreamTasksScanWal(STQ* pTq); -int32_t tqStreamTasksStatusCheck(STQ* pTq); +int32_t tqScanWalForStreamTasks(STQ* pTq); +int32_t tqSetStreamTasksReady(STQ* pTq); +int32_t tqStopStreamTasks(STQ* pTq); // tq util int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 5f5c27bfdd..004c840bfe 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -223,11 +223,11 @@ void tqClose(STQ*); int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqStartStreamTasks(STQ* pTq, bool ckPause); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasksAsync(STQ* pTq, bool ckPause); // restore all stream tasks after vnode launching completed. int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqCheckStreamStatus(STQ* pTq); +int32_t tqSetStreamTasksReadyAsync(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 87617a6812..c6b87d66a2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -133,7 +133,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - if (streamLoadTasks(pTq->pStreamMeta) < 0) { + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { return -1; } @@ -896,7 +896,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { // reset the task status from unfinished transaction if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - tqWarn("s-task:%s reset task status to be normal, kept in meta status: Paused", pTask->id.idStr); + tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr); pTask->status.taskStatus = TASK_STATUS__NORMAL; } @@ -1052,7 +1052,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms bool restored = pTq->pVnode->restored; if (p != NULL && restored) { - streamTaskCheckDownstreamTasks(p); + streamTaskCheckDownstream(p); } else if (!restored) { tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); } @@ -1199,7 +1199,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamSetStatusNormal(pTask); } - tqStartStreamTasks(pTq, false); + tqStartStreamTasksAsync(pTq, false); } streamMetaReleaseTask(pMeta, pTask); @@ -1320,7 +1320,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); if (remain > 0) { - tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, remain:%d not send finish rsp", + tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", pTask->id.idStr, req.downstreamId, remain); } else { tqDebug( @@ -1340,13 +1340,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = pReq->taskId; int32_t vgId = TD_VID(pTq->pVnode); - if (taskId == STREAM_TASK_STATUS_CHECK_ID) { - tqStreamTasksStatusCheck(pTq); + if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) { + tqSetStreamTasksReady(pTq); return 0; } - if (taskId == EXTRACT_DATA_FROM_WAL_ID) { // all tasks are extracted submit data from the wal - tqStreamTasksScanWal(pTq); + if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal + tqScanWalForStreamTasks(pTq); return 0; } @@ -1365,7 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqStartStreamTasks(pTq, false); + tqStartStreamTasksAsync(pTq, false); return 0; } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this @@ -1505,7 +1505,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { - tqStartStreamTasks(pTq, false); + tqStartStreamTasksAsync(pTq, false); } else { streamSchedExec(pTask); } @@ -1815,7 +1815,7 @@ _end: return -1; } - if (streamLoadTasks(pTq->pStreamMeta) < 0) { + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { tqError("vgId:%d failed to load stream tasks", vgId); taosWUnLockLatch(&pMeta->lock); return -1; @@ -1824,7 +1824,7 @@ _end: taosWUnLockLatch(&pMeta->lock); if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d, restart all stream tasks", vgId); - tqCheckStreamStatus(pTq); + tqSetStreamTasksReadyAsync(pTq); } } } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 8a9b95e045..5a4302a8fb 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -46,7 +46,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { // 2. the vnode should be the leader. // 3. the stream is not suspended yet. if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) { - tqStartStreamTasks(pTq, true); + tqStartStreamTasksAsync(pTq, true); } return 0; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index b3fbbf5157..5b718be124 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -19,9 +19,8 @@ static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId); -// this function should be executed by stream threads. // extract submit block from WAL, and add them into the input queue for the sources tasks. -int32_t tqStreamTasksScanWal(STQ* pTq) { +int32_t tqScanWalForStreamTasks(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); @@ -57,7 +56,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { return 0; } -int32_t tqStreamTasksStatusCheck(STQ* pTq) { +int32_t tqSetStreamTasksReady(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -80,7 +79,23 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { continue; } - streamTaskCheckDownstreamTasks(pTask); + if (pTask->info.fillHistory == 1) { + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + // todo: how about the fill-history task? + if (pTask->status.downstreamReady == 1) { + tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", + pTask->id.idStr); + streamLaunchFillHistoryTask(pTask); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + streamSetStatusNormal(pTask); + streamTaskCheckDownstream(pTask); + streamMetaReleaseTask(pMeta, pTask); } @@ -88,7 +103,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { return 0; } -int32_t tqCheckStreamStatus(STQ* pTq) { +int32_t tqSetStreamTasksReadyAsync(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -109,10 +124,10 @@ int32_t tqCheckStreamStatus(STQ* pTq) { return -1; } - tqDebug("vgId:%d check for stream tasks status, numOfTasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = STREAM_TASK_STATUS_CHECK_ID; + pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); @@ -121,7 +136,7 @@ int32_t tqCheckStreamStatus(STQ* pTq) { return 0; } -int32_t tqStartStreamTasks(STQ* pTq, bool ckPause) { +int32_t tqStartStreamTasksAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -168,7 +183,7 @@ int32_t tqStartStreamTasks(STQ* pTq, bool ckPause) { tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = EXTRACT_DATA_FROM_WAL_ID; + pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); @@ -177,6 +192,37 @@ int32_t tqStartStreamTasks(STQ* pTq, bool ckPause) { return 0; } +int32_t tqStopStreamTasks(STQ* pTq) { + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = TD_VID(pTq->pVnode); + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); + + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + SArray* pTaskList = NULL; + taosWLockLatch(&pMeta->lock); + pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + taosWUnLockLatch(&pMeta->lock); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + continue; + } + + streamTaskStop(pTask); + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + return 0; +} + int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 6469045621..61fc3c7ae9 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -177,7 +177,7 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) return code; } -int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); } +int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); } int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index a71257eddf..44375152d4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -560,10 +560,10 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); } else { vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); - tqCheckStreamStatus(pVnode->pTq); + tqSetStreamTasksReadyAsync(pVnode->pTq); } } else { - vInfo("vgId:%d, sync restore finished, no launch stream tasks since not leader", vgId); + vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); } } @@ -578,6 +578,8 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) { tsem_post(&pVnode->syncSem); } taosThreadMutexUnlock(&pVnode->lock); + + tqStopStreamTasks(pVnode->pTq); } static void vnodeBecomeLearner(const SSyncFSM *pFsm) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c51ed10c44..64628f8c7f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -929,7 +929,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); tmsgSendRsp(&pInfo->msg); - qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel, + qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data in WAL", pTask->id.idStr, pTask->info.taskLevel, pInfo->taskId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f1fb97bc64..471804b6d6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -605,7 +605,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { taosArrayDestroy(pRecycleList); } -int32_t streamLoadTasks(SStreamMeta* pMeta) { +int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index be7c1584fd..25ec20d06b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -26,7 +26,6 @@ typedef struct SStreamTaskRetryInfo { } SStreamTaskRetryInfo; static int32_t streamSetParamForScanHistory(SStreamTask* pTask); -static void launchFillHistoryTask(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); @@ -109,7 +108,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { } // check status -int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { +static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { SHistDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; @@ -121,7 +120,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { .stage = pTask->pMeta->stage, }; - // serialize + // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; @@ -160,8 +159,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { streamTaskSetReady(pTask, 0); streamTaskSetRangeStreamCalc(pTask); streamTaskLaunchScanHistory(pTask); - - launchFillHistoryTask(pTask); + streamLaunchFillHistoryTask(pTask); } return 0; @@ -242,7 +240,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { } // when current stream task is ready, check the related fill history task. - launchFillHistoryTask(pTask); + streamLaunchFillHistoryTask(pTask); } // todo handle error @@ -437,7 +435,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); tmsgSendRsp(&msg); - qDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data from WAL", pTask->id.idStr, + qDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", pTask->id.idStr, pTask->info.taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId); return 0; } @@ -504,7 +502,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { +static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.minVer = 0; pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; @@ -518,7 +516,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { } // check if downstream tasks have been ready - streamTaskDoCheckDownstreamTasks(pHTask); + doCheckDownstreamStatus(pHTask); } static void tryLaunchHistoryTask(void* param, void* tmrId) { @@ -565,7 +563,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } if (pHTask != NULL) { - doCheckDownstreamStatus(pTask, pHTask); + checkFillhistoryTaskStatus(pTask, pHTask); streamMetaReleaseTask(pMeta, pHTask); } @@ -582,10 +580,20 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // todo fix the bug: 2. race condition // an fill history task needs to be started. int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { + int32_t tId = pTask->historyTaskId.taskId; + if (tId == 0) { + return TSDB_CODE_SUCCESS; + } + + ASSERT(pTask->status.downstreamReady == 1); + qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, + pTask->historyTaskId.streamId, tId); + SStreamMeta* pMeta = pTask->pMeta; int32_t hTaskId = pTask->historyTaskId.taskId; int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId}; + // Set the execute conditions, including the query time window and the version range SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (pHTask == NULL) { @@ -612,11 +620,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); } - // try again in 500ms + // try again in 100ms return TSDB_CODE_SUCCESS; } - doCheckDownstreamStatus(pTask, *pHTask); + checkFillhistoryTaskStatus(pTask, *pHTask); return TSDB_CODE_SUCCESS; } @@ -786,28 +794,15 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } } -void launchFillHistoryTask(SStreamTask* pTask) { - int32_t tId = pTask->historyTaskId.taskId; - if (tId == 0) { - return; - } - - ASSERT(pTask->status.downstreamReady == 1); - qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, - pTask->historyTaskId.streamId, tId); - - // launch associated fill history task - streamLaunchFillHistoryTask(pTask); -} - // only the downstream tasks are ready, set the task to be ready to work. -void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { +void streamTaskCheckDownstream(SStreamTask* pTask) { if (pTask->info.fillHistory) { qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); return; } + ASSERT(pTask->status.downstreamReady == 0); - streamTaskDoCheckDownstreamTasks(pTask); + doCheckDownstreamStatus(pTask); } // normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause @@ -882,10 +877,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { pTask->status.taskStatus = pTask->status.keepTaskStatus; pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); - qInfo("vgId:%d s-task:%s resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); + qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); - qInfo("vgId:%d s-task:%s sink task.resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); + qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else { qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); }