From 402aefd95debbfced124faaea9c92fedd4f0f7af Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Oct 2023 15:09:47 +0800 Subject: [PATCH] refactor(stream): do some internal refactor. --- include/libs/stream/tstream.h | 13 +- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 6 +- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 34 +++-- source/dnode/vnode/src/tq/tqSink.c | 6 +- source/dnode/vnode/src/tq/tqStreamTask.c | 15 +- source/libs/stream/inc/streamsm.h | 2 +- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamDispatch.c | 6 +- source/libs/stream/src/streamExec.c | 47 +++--- source/libs/stream/src/streamMeta.c | 22 +-- source/libs/stream/src/streamQueue.c | 4 +- source/libs/stream/src/streamStart.c | 54 +++---- source/libs/stream/src/streamTask.c | 12 +- source/libs/stream/src/streamTaskSm.c | 173 ++++++++++++++--------- 16 files changed, 213 insertions(+), 187 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3b076fc9c3..0a541a34ab 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -67,7 +67,7 @@ enum { }; typedef enum ETaskStatus { - TASK_STATUS__NORMAL = 0, + TASK_STATUS__READY = 0, TASK_STATUS__DROPPING, TASK_STATUS__UNINIT, // not used, an placeholder TASK_STATUS__STOP, @@ -139,7 +139,7 @@ typedef enum EStreamTaskEvent { TASK_EVENT_PAUSE = 0x8, TASK_EVENT_RESUME = 0x9, TASK_EVENT_HALT = 0xA, - TASK_EVENT_TRANS_STATE = 0xB, + TASK_EVENT_DROPPING = 0xB, TASK_EVENT_SCAN_TSDB = 0xC, TASK_EVENT_SCAN_WAL = 0xD, } EStreamTaskEvent; @@ -714,12 +714,14 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t void streamTaskInputFail(SStreamTask* pTask); int32_t streamExecTask(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); -bool streamTaskShouldStop(const SStreamStatus* pStatus); -bool streamTaskShouldPause(const SStreamStatus* pStatus); +bool streamTaskShouldStop(const SStreamTask* pStatus); +bool streamTaskShouldPause(const SStreamTask* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); -ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr); +ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr); +void streamTaskResetStatus(SStreamTask* pTask); +void streamTaskSetStatusReady(SStreamTask* pTask); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); @@ -753,7 +755,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); -int32_t streamSetStatusNormal(SStreamTask* pTask); int32_t streamSetStatusUnint(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 568808405e..4fb02bb021 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -244,7 +244,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointInterval = 300; +int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 30; int32_t tsTtlUnit = 86400; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f23d596449..0a9e3c5336 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1170,7 +1170,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } } - { // check if all tasks are in TASK_STATUS__NORMAL status + { // check if all tasks are in TASK_STATUS__READY status bool ready = true; taosThreadMutexLock(&execNodeList.lock); @@ -1181,7 +1181,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { continue; } - if (pEntry->status != TASK_STATUS__NORMAL) { + if (pEntry->status != TASK_STATUS__READY) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status)); ready = false; @@ -2614,7 +2614,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } pEntry->status = p->status; - if (p->status != TASK_STATUS__NORMAL) { + if (p->status != TASK_STATUS__READY) { mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); } } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 96274ec102..9aed87d811 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -159,7 +159,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); -int32_t tqCheckAndRunStreamTask(STQ* pTq); +int32_t tqStartStreamTask(STQ* pTq); int32_t tqStartStreamTasks(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5e0a81314f..750a9d942a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -847,11 +847,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); } - // reset the task status from unfinished transaction - if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr); - pTask->status.taskStatus = TASK_STATUS__NORMAL; - } +// // reset the task status from unfinished transaction +// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { +// tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr); +// pTask->status.taskStatus = TASK_STATUS__READY; +// } streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); @@ -1029,7 +1029,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms bool restored = pTq->pVnode->restored; if (p != NULL && restored && p->info.fillHistory == 0) { - EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; + EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST; streamTaskHandleEvent(p->status.pSM, event); } else if (!restored) { tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); @@ -1118,7 +1118,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamScanHistoryData(pTask); double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; - if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { + if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE) { int8_t status = streamTaskSetSchedStatusInactive(pTask); tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); @@ -1228,7 +1228,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTaskRestoreStatus(pTask); // if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { -// pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; +// pTask->status.keepTaskStatus = TASK_STATUS__READY; // qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id, // streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus)); // } @@ -1352,7 +1352,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) { - tqCheckAndRunStreamTask(pTq); + tqStartStreamTask(pTq); return 0; } @@ -1365,7 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask != NULL) { // even in halt status, the data in inputQ must be processed char* p = NULL; ETaskStatus st = streamTaskGetStatus(pTask, &p); - if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) { + if (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer); streamExecTask(pTask); @@ -1514,7 +1514,6 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, return -1; } - // todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal streamTaskResume(pTask, pTq->pStreamMeta); int32_t level = pTask->info.taskLevel; @@ -1523,8 +1522,8 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, return 0; } - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { // no lock needs to secure the access of the version if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { // discard all the data when the stream task is suspended. @@ -1537,8 +1536,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus); } - if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && - pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0)) { tqScanWalAsync(pTq, false); @@ -1867,7 +1865,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s receive nodeEp update msg from mnode", pTask->id.idStr); streamTaskUpdateEpsetInfo(pTask, req.pNodeList); - streamSetStatusNormal(pTask); + streamTaskResetStatus(pTask); SStreamTask** ppHTask = NULL; if (pTask->hTaskInfo.id.taskId != 0) { @@ -1971,10 +1969,10 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); // clear flag set during do checkpoint, and open inputQ for all upstream tasks - if (pTask->status.taskStatus == TASK_STATUS__CK) { + if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) { streamTaskClearCheckInfo(pTask); taosArrayClear(pTask->pReadyMsgList); - streamSetStatusNormal(pTask); + streamTaskSetStatusReady(pTask); } streamMetaReleaseTask(pMeta, pTask); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7d1c754005..5dafe6a4a0 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -592,7 +592,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI const char* id = pTask->id.idStr; while (pTableSinkInfo->uid == 0) { - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(pTask)) { tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); return TSDB_CODE_STREAM_EXEC_CANCELLED; } @@ -773,7 +773,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { numOfBlocks); for(int32_t i = 0; i < numOfBlocks; ++i) { - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(pTask)) { return; } @@ -823,7 +823,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { bool hasSubmit = false; for (int32_t i = 0; i < numOfBlocks; i++) { - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(pTask)) { taosHashCleanup(pTableIndexMap); tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); return; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index be64ec20a4..8dfa1e2670 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -60,7 +60,7 @@ int32_t tqScanWal(STQ* pTq) { return 0; } -int32_t tqCheckAndRunStreamTask(STQ* pTq) { +int32_t tqStartStreamTask(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -99,7 +99,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { continue; } - EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; + EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST; streamTaskHandleEvent(pTask->status.pSM, event); streamMetaReleaseTask(pMeta, pTask); } @@ -240,9 +240,8 @@ int32_t tqStartStreamTasks(STQ* pTq) { STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - int8_t status = (*pTask)->status.taskStatus; - if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) { - streamSetStatusNormal(*pTask); + if ((*pTask)->info.fillHistory != 1) { + streamTaskResetStatus(*pTask); } } @@ -327,14 +326,14 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) { // int32_t status = pTask->status.taskStatus; char* p = NULL; int32_t status = streamTaskGetStatus(pTask, &p); - if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__NORMAL) { + if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) { tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, p); return false; } // fill-history task has entered into the last phase, no need to anything if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { - ASSERT(status == TASK_STATUS__NORMAL); + ASSERT(status == TASK_STATUS__READY); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, pTask->dataRange.range.maxVer); @@ -449,7 +448,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); - if (status != TASK_STATUS__NORMAL) { + if (status != TASK_STATUS__READY) { tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, p); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pStreamMeta, pTask); diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h index b1ccc19e23..19eb3c0029 100644 --- a/source/libs/stream/inc/streamsm.h +++ b/source/libs/stream/inc/streamsm.h @@ -48,7 +48,7 @@ typedef struct STaskStateTrans { struct SStreamTaskSM { SStreamTask* pTask; - SArray* pTransList; // SArray +// SArray* pTransList; // SArray STaskStateTrans* pActiveTrans; int64_t startTs; SStreamTaskState current; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 91205a216b..307e1c47c9 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -64,7 +64,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { int8_t status = atomic_load_8(&pTask->schedInfo.status); stDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); - if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { stDebug("s-task:%s jump out of schedTimer", id); return; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b339cb6969..e6378d309e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -420,7 +420,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { const char* id = pTask->id.idStr; int32_t msgId = pTask->execInfo.dispatch; - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(pTask)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); return; @@ -474,10 +474,10 @@ static void doRetryDispatchData(void* param, void* tmrId) { } if (code != TSDB_CODE_SUCCESS) { - if (!streamTaskShouldStop(&pTask->status)) { + if (!streamTaskShouldStop(pTask)) { // stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); // atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0); - if (streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldPause(pTask)) { streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10); } else { streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index da7e2ece0d..346f6cefcb 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -22,14 +22,13 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); -bool streamTaskShouldStop(const SStreamStatus* pStatus) { - int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); - return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); +bool streamTaskShouldStop(const SStreamTask* pTask) { + ETaskStatus s = streamTaskGetStatus(pTask, NULL); + return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING); } -bool streamTaskShouldPause(const SStreamStatus* pStatus) { - int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); - return (status == TASK_STATUS__PAUSE); +bool streamTaskShouldPause(const SStreamTask* pTask) { + return (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE); } static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) { @@ -102,7 +101,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i pRes = taosArrayInit(4, sizeof(SSDataBlock)); } - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(pTask)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } @@ -198,7 +197,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { qSetStreamOpOpen(exec); while (!finished) { - if (streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldPause(pTask)) { double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); break; @@ -213,7 +212,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { int32_t size = 0; int32_t numOfBlocks = 0; while (1) { - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(pTask)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } @@ -309,20 +308,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } - ASSERT(((pStreamTask->status.taskStatus == TASK_STATUS__STOP) || - (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) && + int8_t status = streamTaskGetStatus(pStreamTask, NULL); + ASSERT(((status == TASK_STATUS__DROPPING) || (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) && pTask->status.appendTranstateBlock == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; - // todo. the dropping status should be append to the status after the halt completed. // It must be halted for a source stream task, since when the related scan-history-data task start scan the history // for the step 2. - int8_t status = streamTaskGetStatus(pStreamTask, NULL);//pStreamTask->status.taskStatus; if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { - ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING); + ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); } else { - ASSERT(status == TASK_STATUS__NORMAL); + ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } @@ -333,13 +330,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // In case of sink tasks, no need to halt them. // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. - // When a task is idle with halt status, all data in inputQ are consumed. + char* p = NULL; + streamTaskGetStatus(pStreamTask, &p); if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 - ", status:normal, sched-status:%d", + ", status:%s, sched-status:%d", pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, - pTimeWindow->ekey, pStreamTask->status.schedStatus); + pTimeWindow->ekey, p, pStreamTask->status.schedStatus); } else { stDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); } @@ -366,6 +364,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 6. save to disk taosWLockLatch(&pMeta->lock); + + pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); streamMetaSaveTask(pMeta, pStreamTask); if (streamMetaCommit(pMeta) < 0) { // persist to disk @@ -525,7 +525,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t blockSize = 0; int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s stream task is stopped", id); break; } @@ -605,8 +605,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { // the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not // be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING. bool streamTaskIsIdle(const SStreamTask* pTask) { - return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || pTask->status.taskStatus == TASK_STATUS__STOP || - pTask->status.taskStatus == TASK_STATUS__DROPPING); + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP || + status == TASK_STATUS__DROPPING); } int32_t streamExecTask(SStreamTask* pTask) { @@ -623,8 +624,8 @@ int32_t streamExecTask(SStreamTask* pTask) { } taosThreadMutexLock(&pTask->lock); - if ((streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0) || streamTaskShouldStop(&pTask->status) || - streamTaskShouldPause(&pTask->status)) { + if ((streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0) || streamTaskShouldStop(pTask) || + streamTaskShouldPause(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a80e3a05f0..ee4f1e2340 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -461,7 +461,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { - if (!streamTaskShouldStop(&(*ppTask)->status)) { + if (!streamTaskShouldStop(*ppTask)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); taosRUnLockLatch(&pMeta->lock); stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); @@ -478,7 +478,7 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) if (ref > 0) { stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { - ASSERT(streamTaskShouldStop(&pTask->status)); + ASSERT(streamTaskShouldStop(pTask)); stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); tFreeStreamTask(pTask); } else if (ref < 0) { @@ -506,11 +506,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; - if (streamTaskShouldPause(&pTask->status)) { + + // desc the paused task counter + if (streamTaskShouldPause(pTask)) { int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); } - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); + + // handle the dropping event + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING); } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); taosWUnLockLatch(&pMeta->lock); @@ -522,8 +526,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t while (1) { taosRLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { if ((*ppTask)->status.timerActive == 0) { taosRUnLockLatch(&pMeta->lock); @@ -548,15 +552,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId)); if (ppStreamTask != NULL) { - (*ppStreamTask)->hTaskInfo.id.taskId = 0; - (*ppStreamTask)->hTaskInfo.id.streamId = 0; + CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); } } else { atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); ASSERT(pTask->status.timerActive == 0); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); @@ -702,8 +704,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { tFreeStreamTask(pTask); STaskId id = streamTaskExtractKey(pTask); - taosArrayPush(pRecycleList, &id); + int32_t total = taosArrayGetSize(pRecycleList); stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); continue; @@ -739,7 +741,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } - if (streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldPause(pTask)) { atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index e396ac77b4..70a065c22e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -165,7 +165,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } while (1) { - if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) { stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); return TSDB_CODE_SUCCESS; } @@ -346,7 +346,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc STaosQueue* pQueue = pTask->outputq.queue->pQueue; while (streamQueueIsFull(pTask->outputq.queue)) { - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(pTask)) { stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); return TSDB_CODE_STREAM_EXEC_CANCELLED; } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 003596ce90..58ea042079 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -263,14 +263,14 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); - ASSERT(status == TASK_STATUS__NORMAL); + ASSERT(status == TASK_STATUS__READY); // todo refactor: remove this later - if (pTask->info.fillHistory == 1) { - stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); - pTask->status.taskStatus = TASK_STATUS__DROPPING; - ASSERT(pTask->hTaskInfo.id.taskId == 0); - } +// if (pTask->info.fillHistory == 1) { +// stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); +// pTask->status.taskStatus = TASK_STATUS__DROPPING; +// ASSERT(pTask->hTaskInfo.id.taskId == 0); +// } if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, @@ -315,7 +315,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask) { int8_t status = pTask->status.taskStatus; const char* str = streamGetTaskStatusStr(status); - ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL); + ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__READY); streamTaskSetRangeStreamCalc(pTask); if (status == TASK_STATUS__SCAN_HISTORY) { @@ -341,7 +341,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, do not do check downstream again", id); return TSDB_CODE_SUCCESS; } @@ -447,18 +447,6 @@ int32_t streamRestoreParam(SStreamTask* pTask) { return qRestoreStreamOperatorOption(pTask->exec.pExecutor); } -int32_t streamSetStatusNormal(SStreamTask* pTask) { - int32_t status = atomic_load_8(&pTask->status.taskStatus); - if (status == TASK_STATUS__DROPPING) { - stError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr); - return -1; - } else { - stDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); - return 0; - } -} - // source int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow); @@ -636,7 +624,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { if (ppTask) { ASSERT((*ppTask)->status.timerActive >= 1); - if (streamTaskShouldStop(&(*ppTask)->status)) { + if (streamTaskShouldStop(*ppTask)) { char* p = NULL; streamTaskGetStatus((*ppTask), &p); int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); @@ -677,7 +665,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // abort the timer if intend to stop task SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId); - if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { + if (pHTask == NULL && (!streamTaskShouldStop(pTask))) { char* p = NULL; int32_t hTaskId = pHTaskInfo->id.taskId; @@ -776,7 +764,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { + if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__DROPPING) { return 0; } @@ -1049,7 +1037,7 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__PAUSE) { pTask->status.taskStatus = pTask->status.keepTaskStatus; - pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; + pTask->status.keepTaskStatus = TASK_STATUS__READY; int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); stInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { @@ -1065,14 +1053,14 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { // todo fix race condition void streamTaskDisablePause(SStreamTask* pTask) { // pre-condition check - const char* id = pTask->id.idStr; - while (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - stDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id); - taosMsleep(100); - } - - stDebug("s-task:%s disable task pause", id); - pTask->status.pauseAllowed = 0; +// const char* id = pTask->id.idStr; +// while (pTask->status.taskStatus == TASK_STATUS__PAUSE) { +// stDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id); +// taosMsleep(100); +// } +// +// stDebug("s-task:%s disable task pause", id); +// pTask->status.pauseAllowed = 0; } void streamTaskEnablePause(SStreamTask* pTask) { @@ -1092,7 +1080,7 @@ void streamTaskResumeFromHalt(SStreamTask* pTask) { // } // pTask->status.taskStatus = pTask->status.keepTaskStatus; -// pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; +// pTask->status.keepTaskStatus = TASK_STATUS__READY; streamTaskRestoreStatus(pTask); streamTaskGetStatus(pTask, &p); stDebug("s-task:%s resume from halt, current status:%s", id, p); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b20bd9421c..fdcfcfa9a9 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -58,7 +58,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__NORMAL; + pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY; pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; @@ -581,13 +581,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { int64_t st = taosGetTimestampMs(); const char* id = pTask->id.idStr; - taosThreadMutexLock(&pTask->lock); - if (pTask->status.taskStatus == TASK_STATUS__CK) { - stDebug("s-task:%s in checkpoint will be discarded since task is stopped", id); - } - pTask->status.taskStatus = TASK_STATUS__STOP; - taosThreadMutexUnlock(&pTask->lock); - + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) { stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id, @@ -740,7 +734,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) { const char* streamGetTaskStatusStr(int32_t status) { switch(status) { - case TASK_STATUS__NORMAL: return "normal"; + case TASK_STATUS__READY: return "normal"; case TASK_STATUS__SCAN_HISTORY: return "scan-history"; case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__PAUSE: return "paused"; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index bb7c168922..49a434af94 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -21,7 +21,7 @@ #include "wal.h" SStreamTaskState StreamTaskStatusList[9] = { - {.state = TASK_STATUS__NORMAL, .name = "normal"}, + {.state = TASK_STATUS__READY, .name = "ready"}, {.state = TASK_STATUS__DROPPING, .name = "dropped"}, {.state = TASK_STATUS__UNINIT, .name = "uninit"}, {.state = TASK_STATUS__STOP, .name = "stop"}, @@ -45,9 +45,13 @@ SStreamEventInfo StreamTaskEventList[10] = { {.event = TASK_EVENT_HALT, .name = "halting"}, }; +static TdThreadOnce streamTaskStateMachineInit = PTHREAD_ONCE_INIT; +static SArray* streamTaskSMTrans = NULL; + static int32_t streamTaskInitStatus(SStreamTask* pTask); static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask); -static int32_t initStateTransferTable(SStreamTaskSM* pSM); +static int32_t initStateTransferTable(); +static void doInitStateTransferTable(void); static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, __state_trans_succ_fn succFn, @@ -61,13 +65,7 @@ static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) { stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p, StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name); - - SStreamTaskSM* pSM = pTask->status.pSM; - if (pSM->eventList == NULL) { - - } - - taosArrayPush(pSM->eventList, pEvtInfo); + taosArrayPush(pTask->status.pSM->eventList, pEvtInfo); return 0; } @@ -84,8 +82,6 @@ int32_t streamTaskSetReadyForWal(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr); } - - streamSetStatusNormal(pTask); // todo remove it return TSDB_CODE_SUCCESS; } @@ -109,9 +105,9 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { // todo optimize the perf of find the trans objs by using hash table static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) { - int32_t numOfTrans = taosArrayGetSize(pState->pTransList); + int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans); for (int32_t i = 0; i < numOfTrans; ++i) { - STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i); + STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i); if (pTrans->state.state == pState->current.state && pTrans->event == event) { return pTrans; } @@ -138,6 +134,7 @@ void streamTaskRestoreStatus(SStreamTask* pTask) { } SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { + initStateTransferTable(); const char* id = pTask->id.idStr; SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM)); @@ -161,14 +158,7 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { // set the initial state for the state-machine of stream task pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; - pSM->startTs = taosGetTimestampMs(); - int32_t code = initStateTransferTable(pSM); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pSM->eventList); - taosMemoryFree(pSM); - return NULL; - } return pSM; } @@ -178,7 +168,6 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) { } taosArrayDestroy(pSM->eventList); - taosArrayDestroy(pSM->pTransList); taosMemoryFree(pSM); return NULL; } @@ -276,7 +265,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { return TSDB_CODE_SUCCESS; } -ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr) { +ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr) { SStreamTaskState s = pTask->status.pSM->current; // copy one obj in case of multi-thread environment if (pStr != NULL) { *pStr = s.name; @@ -284,6 +273,28 @@ ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr) { return s.state; } +void streamTaskResetStatus(SStreamTask* pTask) { + SStreamTaskSM* pSM = pTask->status.pSM; + pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; + pSM->pActiveTrans = NULL; + taosArrayClear(pSM->eventList); +} + +void streamTaskSetStatusReady(SStreamTask* pTask) { + SStreamTaskSM* pSM = pTask->status.pSM; + if (pSM->current.state == TASK_STATUS__DROPPING) { + stError("s-task:%s task in dropping state, cannot be set ready", pTask->id.idStr); + return; + } + + pSM->prev = pSM->current; + + pSM->current = StreamTaskStatusList[TASK_STATUS__READY]; + pSM->startTs = taosGetTimestampMs(); + pSM->pActiveTrans = NULL; + taosArrayClear(pSM->eventList); +} + STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, __state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) { STaskStateTrans trans = {0}; @@ -301,92 +312,124 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr return trans; } -int32_t initStateTransferTable(SStreamTaskSM* pSM) { - if (pSM->pTransList == NULL) { - pSM->pTransList = taosArrayInit(8, sizeof(STaskStateTrans)); - if (pSM->pTransList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } +int32_t initStateTransferTable() { + taosThreadOnce(&streamTaskStateMachineInit, doInitStateTransferTable); + return TSDB_CODE_SUCCESS; +} + +void doInitStateTransferTable(void) { + streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans)); // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS_STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE, + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, streamTaskSetReadyForWal, NULL, NULL, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE, + trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, streamTaskSetReadyForWal, NULL, NULL, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); // halt stream task, from other task status - trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); - SAttachedEventInfo info = {.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_HALT}; + SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); // checkpoint related event - trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); trans = - createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); - taosArrayPush(pSM->pTransList, &trans); + createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); // pause & resume related event handle - trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); - taosArrayPush(pSM->pTransList, &trans); - + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); - info = (SAttachedEventInfo){.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_PAUSE}; + info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); - taosArrayPush(pSM->pTransList, &trans); - + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); - taosArrayPush(pSM->pTransList, &trans); - + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); - taosArrayPush(pSM->pTransList, &trans); - + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); - taosArrayPush(pSM->pTransList, &trans); - + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); - taosArrayPush(pSM->pTransList, &trans); - + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); - taosArrayPush(pSM->pTransList, &trans); + taosArrayPush(streamTaskSMTrans, &trans); // resume is completed by restore status of state-machine - return 0; + // stop related event + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + + // dropping related event + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); } \ No newline at end of file