diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2ce640a2bc..329a6bbc29 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,7 +45,7 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner - TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused + TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused, todo remove it? TASK_STATUS__PAUSE, // pause }; @@ -272,6 +272,7 @@ typedef struct SStreamStatus { int8_t keepTaskStatus; bool transferState; int8_t timerActive; // timer is active + int8_t pauseAllowed; // allowed task status to be set to be paused } SStreamStatus; typedef struct SHistDataRange { @@ -557,7 +558,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); -// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); @@ -593,6 +593,9 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); +void streamTaskPause(SStreamTask* pTask); +void streamTaskDisablePause(SStreamTask* pTask); +void streamTaskEnablePause(SStreamTask* pTask); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 7cb52d2bc9..8984b798ab 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -25,6 +25,7 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; +static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream); static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); @@ -267,10 +268,15 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas return terrno; } + for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); + setTaskUpstreamEpInfo(pTask, pSinkTask); + } + return TSDB_CODE_SUCCESS; } -static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { +static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); if (pEpInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -295,7 +301,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } -int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { +int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) { SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); if (pEpInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -418,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui return -1; } - return setEpToDownstreamTask(pTask, pDownstreamTask); + return setTaskUpstreamEpInfo(pTask, pDownstreamTask); } static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, @@ -586,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr return TSDB_CODE_SUCCESS; } +static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) { + SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL); + for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); + setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask); + } +} + static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -637,6 +651,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* return code; } + setSinkTaskUpstreamInfo(pStream->tasks, pAggTask); + setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask); + // source level return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); } else if (numOfPlanLevel == 1) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 81176f8929..201c621ab3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -914,6 +914,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } + // reset the task status from unfinished transaction + if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { + tqWarn("s-task:%s reset task status to be normal, kept in meta status: Paused", pTask->id.idStr); + pTask->status.taskStatus = TASK_STATUS__NORMAL; + } + streamSetupScheduleTrigger(pTask); tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 @@ -1031,9 +1037,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamMeta* pStreamMeta = pTq->pStreamMeta; // 2.save task, use the newest commit version as the initial start version of stream task. + int32_t taskId = 0; taosWLockLatch(&pStreamMeta->lock); code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask); + taskId = pTask->id.taskId; int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); if (code < 0) { tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); @@ -1046,7 +1054,12 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); // 3. It's an fill history task, do nothing. wait for the main task to start it - streamTaskCheckDownstreamTasks(pTask); + SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); + if (p != NULL) { + streamTaskCheckDownstreamTasks(pTask); + } + + streamMetaReleaseTask(pStreamMeta, p); return 0; } @@ -1073,7 +1086,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { - ASSERT(0); + tqDebug("s-task:%s failed to launch scan history data in current time window, unexpected sched status:%d", id, + schedStatus); + + streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1215,9 +1231,11 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); + tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); if (pTask == NULL) { - tqError("failed to find task:0x%x, it may have been dropped already", req.taskId); + tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.taskId); return -1; } @@ -1406,17 +1424,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL return 0; } -// todo rule out the status when pause not suitable. -static int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) { - if (!streamTaskShouldPause(&pTask->status)) { - tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr); - atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - } - - return 0; -} - int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; @@ -1425,30 +1432,29 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, pReq->taskId); - return TSDB_CODE_STREAM_TASK_NOT_EXIST; + + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; } tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); - - int32_t code = tqProcessTaskPauseImpl(pMeta, pTask); - if (code != 0) { - streamMetaReleaseTask(pMeta, pTask); - return code; - } + streamTaskPause(pTask); SStreamTask* pHistoryTask = NULL; if (pTask->historyTaskId.taskId != 0) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { - tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId, - pTask->historyTaskId.taskId); + tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success", + pMeta->vgId, pTask->historyTaskId.taskId); streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_STREAM_TASK_NOT_EXIST; + + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; } - tqDebug("s-task:%s fill-history task handle pause along with related stream task", pHistoryTask->id.idStr); - code = tqProcessTaskPauseImpl(pMeta, pHistoryTask); + tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); + streamTaskPause(pHistoryTask); } streamMetaReleaseTask(pMeta, pTask); @@ -1456,7 +1462,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg streamMetaReleaseTask(pMeta, pHistoryTask); } - return code; + return TSDB_CODE_SUCCESS; } int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0a248d0ffe..0654bcf69f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -365,6 +365,10 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } + // todo fix race condition + streamTaskDisablePause(pTask); + streamTaskDisablePause(pStreamTask); + ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -420,6 +424,10 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { } taosWUnLockLatch(&pMeta->lock); + // pause allowed + streamTaskEnablePause(pStreamTask); + streamTaskEnablePause(pTask); + streamSchedExec(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask); return TSDB_CODE_SUCCESS; @@ -568,22 +576,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } bool streamTaskIsIdle(const SStreamTask* pTask) { - int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue); - if (numOfItems > 0) { - return false; - } - - numOfItems = taosQallItemSize(pTask->inputQueue->qall); - if (numOfItems > 0) { - return false; - } - - // blocked by downstream task - if (pTask->outputInfo.status == TASK_OUTPUT_STATUS__BLOCKED) { - return false; - } - - return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE); + return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE/* && pTask->status.taskStatus != TASK_STATUS__HALT*/); } int32_t streamTryExec(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index ddd7ae4676..d3687c3845 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -72,7 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - return doLaunchScanHistoryTask(pTask); + int32_t code = doLaunchScanHistoryTask(pTask); + streamTaskEnablePause(pTask); + return code; } else { ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr, @@ -86,6 +88,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); } + streamTaskEnablePause(pTask); return 0; } @@ -198,6 +201,11 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); } + // enable pause when init completed. + if (pTask->historyTaskId.taskId == 0 && pTask->info.fillHistory == 0) { + streamTaskEnablePause(pTask); + } + // when current stream task is ready, check the related fill history task. launchFillHistoryTask(pTask); } @@ -415,8 +423,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // agg int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); - qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr, - pTask->numOfWaitingUpstream); + qDebug("s-task:%s agg task wait for %d upstream tasks complete scan-history procedure, status:%s", pTask->id.idStr, + pTask->numOfWaitingUpstream, streamGetTaskStatusStr(pTask->status.taskStatus)); return 0; } @@ -745,7 +753,6 @@ void launchFillHistoryTask(SStreamTask* pTask) { streamLaunchFillHistoryTask(pTask); } -// todo handle race condition, this task may be destroyed void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { if (pTask->info.fillHistory) { qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); @@ -757,3 +764,37 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { // check downstream tasks for itself streamTaskDoCheckDownstreamTasks(pTask); } + +void streamTaskPause(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + + int64_t st = taosGetTimestampMs(); + while(!pTask->status.pauseAllowed) { + qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId); + taosMsleep(100); + } + + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + + int64_t el = taosGetTimestampMs() - st; + qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.keepTaskStatus), el); +} + +// todo fix race condition +void streamTaskDisablePause(SStreamTask* pTask) { + // pre-condition check + while (pTask->status.taskStatus == TASK_STATUS__PAUSE) { + taosMsleep(10); + qDebug("s-task:%s already in pause, wait for pause being cancelled, and then set pause disabled", pTask->id.idStr); + } + + qDebug("s-task:%s disable task pause", pTask->id.idStr); + pTask->status.pauseAllowed = 0; +} + +void streamTaskEnablePause(SStreamTask* pTask) { + qDebug("s-task:%s enable task pause", pTask->id.idStr); + pTask->status.pauseAllowed = 1; +} \ No newline at end of file