diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 41f260fdc3..3b076fc9c3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -75,6 +75,7 @@ typedef enum ETaskStatus { TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore + TASK_STATUS_STREAM_SCAN_HISTORY, } ETaskStatus; enum { @@ -129,17 +130,18 @@ enum { typedef enum EStreamTaskEvent { TASK_EVENT_INIT = 0x1, - TASK_EVENT_INIT_SCAN_HISTORY = 0x2, - TASK_EVENT_SCANHIST_COMPLETED = 0x3, - TASK_EVENT_START = 0x4, + TASK_EVENT_INIT_SCANHIST = 0x2, + TASK_EVENT_INIT_STREAM_SCANHIST = 0x3, + TASK_EVENT_SCANHIST_DONE = 0x4, TASK_EVENT_STOP = 0x5, TASK_EVENT_GEN_CHECKPOINT = 0x6, - TASK_EVENT_PAUSE = 0x7, - TASK_EVENT_RESUME = 0x8, - TASK_EVENT_HALT = 0x9, - TASK_EVENT_TRANS_STATE = 0xA, - TASK_EVENT_SCAN_TSDB = 0xB, - TASK_EVENT_SCAN_WAL = 0xC, + TASK_EVENT_CHECKPOINT_DONE = 0x7, + TASK_EVENT_PAUSE = 0x8, + TASK_EVENT_RESUME = 0x9, + TASK_EVENT_HALT = 0xA, + TASK_EVENT_TRANS_STATE = 0xB, + TASK_EVENT_SCAN_TSDB = 0xC, + TASK_EVENT_SCAN_WAL = 0xD, } EStreamTaskEvent; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 01dea11235..5e0a81314f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1029,7 +1029,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms bool restored = pTq->pVnode->restored; if (p != NULL && restored && p->info.fillHistory == 0) { - EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY; + EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; streamTaskHandleEvent(p->status.pSM, event); } else if (!restored) { tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); @@ -1259,7 +1259,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } #endif - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED); + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); tqScanWalAsync(pTq, false); } streamMetaReleaseTask(pMeta, pStreamTask); @@ -1683,7 +1683,6 @@ FAIL: return -1; } -// todo error code cannot be return, since this is invoked by an mnode-launched transaction. int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -1694,7 +1693,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // disable auto rsp to mnode pRsp->info.handle = NULL; - // todo: add counter to make sure other tasks would not be trapped in checkpoint state SStreamCheckpointSourceReq req = {0}; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId); @@ -1725,6 +1723,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } tDecoderClear(&decoder); + // todo handle failure to reset from checkpoint procedure SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, @@ -1735,6 +1734,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } + // todo handle failure to reset from checkpoint procedure // downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. if (pTask->status.downstreamReady != 1) { pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id @@ -1750,8 +1750,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } + // todo save the checkpoint failed info taosThreadMutexLock(&pTask->lock); - if (pTask->status.taskStatus == TASK_STATUS__HALT) { + if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT) { qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); taosThreadMutexUnlock(&pTask->lock); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 29de14e3c3..be64ec20a4 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -99,7 +99,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { continue; } - EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY; + EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; streamTaskHandleEvent(pTask->status.pSM, event); streamMetaReleaseTask(pMeta, pTask); } diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h index 39ea2a77f8..b1ccc19e23 100644 --- a/source/libs/stream/inc/streamsm.h +++ b/source/libs/stream/inc/streamsm.h @@ -31,14 +31,19 @@ typedef struct SStreamTaskState { typedef int32_t (*__state_trans_fn)(SStreamTask*); typedef int32_t (*__state_trans_succ_fn)(SStreamTask*); -typedef struct STaskStateTrans { - bool autoInvokeEndFn; - SStreamTaskState state; +typedef struct SAttachedEventInfo { + ETaskStatus status; EStreamTaskEvent event; - SStreamTaskState next; - __state_trans_fn preAction; - __state_trans_fn pAction; +} SAttachedEventInfo; + +typedef struct STaskStateTrans { + bool autoInvokeEndFn; + SStreamTaskState state; + EStreamTaskEvent event; + SStreamTaskState next; + __state_trans_fn pAction; __state_trans_succ_fn pSuccAction; + SAttachedEventInfo attachEvent; } STaskStateTrans; struct SStreamTaskSM { @@ -48,6 +53,8 @@ struct SStreamTaskSM { int64_t startTs; SStreamTaskState current; SStreamTaskState prev; + // register the next handled event, if current state is not allowed to handle this event + SArray* eventList; }; typedef struct SStreamEventInfo { @@ -57,7 +64,7 @@ typedef struct SStreamEventInfo { } SStreamEventInfo; SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); - +void* streamDestroyStateMachine(SStreamTaskSM* pSM); #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 8910616f01..91205a216b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -69,7 +69,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { return; } - if (pTask->status.taskStatus == TASK_STATUS__CK) { + if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) { stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); } else { if (status == TASK_TRIGGER_STATUS__ACTIVE) { @@ -267,8 +267,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } // disable the data from upstream tasks - int8_t st = pTask->status.taskStatus; - if (st == TASK_STATUS__HALT) { + if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT) { status = TASK_INPUT_STATUS__BLOCKED; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2cde368195..7026ac7119 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -135,12 +135,13 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + // todo this status may not be set here. // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. - pTask->status.taskStatus = TASK_STATUS__CK; + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + pTask->checkpointingId = pReq->checkpointId; pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); pTask->chkInfo.startTs = taosGetTimestampMs(); - pTask->execInfo.checkpoint += 1; // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task @@ -171,11 +172,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc const char* id = pTask->id.idStr; int32_t code = TSDB_CODE_SUCCESS; - // set the task status - pTask->checkpointingId = checkpointId; - // set task status - pTask->status.taskStatus = TASK_STATUS__CK; + if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) { + pTask->checkpointingId = checkpointId; + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + } { // todo: remove this when the pipeline checkpoint generating is used. SStreamMeta* pMeta = pTask->pMeta; @@ -195,6 +196,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info + atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1); streamProcessCheckpointReadyMsg(pTask); streamFreeQitem((SStreamQueueItem*)pBlock); } @@ -288,12 +290,14 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { continue; } - int8_t prev = p->status.taskStatus; ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); p->chkInfo.checkpointId = p->checkpointingId; streamTaskClearCheckInfo(p); - streamSetStatusNormal(p); + + char* str = NULL; + streamTaskGetStatus(p, &str); + streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); // save the task streamMetaSaveTask(pMeta, p); @@ -302,13 +306,13 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { "vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, " "checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, - streamGetTaskStatusStr(prev)); + str); } if (streamMetaCommit(pMeta) < 0) { taosWUnLockLatch(&pMeta->lock); stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId, - checkpointId, terrstr()); + checkpointId, terrstr()); return -1; } else { taosWUnLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 39afc3c98a..b339cb6969 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1011,7 +1011,7 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, taosThreadMutexUnlock(&pTask->lock); int32_t num = taosArrayGetSize(pTask->pRspMsgList); - stDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId, + stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId, num); return TSDB_CODE_SUCCESS; } @@ -1027,7 +1027,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); tmsgSendRsp(&pInfo->msg); - stDebug("s-task:%s level:%d notify upstream:0x%x continuing scan data in WAL", id, level, pInfo->taskId); + stDebug("s-task:%s level:%d notify upstream:0x%x continuing handle data in WAL", id, level, pInfo->taskId); } taosArrayClear(pTask->pRspMsgList); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ca67e19901..da7e2ece0d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -324,7 +324,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } else { ASSERT(status == TASK_STATUS__NORMAL); streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); -// pStreamTask->status.taskStatus = TASK_STATUS__HALT; stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } @@ -375,7 +374,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 7. pause allowed. streamTaskEnablePause(pStreamTask); - if (taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) { + if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); @@ -492,7 +491,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock); if (code == 0) { streamDispatchStreamBlock(pTask); - } else { + } else { // todo put into queue failed, retry streamFreeQitem((SStreamQueueItem*)pBlock); } } else { // level == TASK_LEVEL__SINK diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2773912437..a80e3a05f0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -894,7 +894,7 @@ void metaHbToMnode(void* param, void* tmrId) { STaskStatusEntry entry = { .id = *pId, - .status = (*pTask)->status.taskStatus, + .status = streamTaskGetStatus(*pTask, NULL), .nodeId = pMeta->vgId, .stage = pMeta->stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)), diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 8ebc1c0094..003596ce90 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -272,8 +272,12 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { ASSERT(pTask->hTaskInfo.id.taskId == 0); } - stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, - id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader)); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, + id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader)); + } else { + stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus); + } streamTaskEnablePause(pTask); return TSDB_CODE_SUCCESS; @@ -520,11 +524,13 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory int32_t taskLevel = pTask->info.taskLevel; ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); - char* p = NULL; + const char* id = pTask->id.idStr; + char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); + if (status != TASK_STATUS__SCAN_HISTORY) { stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", - pTask->id.idStr, p, pReq->upstreamTaskId); + id, p, pReq->upstreamTaskId); void* pBuf = NULL; int32_t len = 0; @@ -534,8 +540,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); tmsgSendRsp(&msg); - stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", pTask->id.idStr, - pTask->info.taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId); + stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, + taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId); return 0; } @@ -547,13 +553,15 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory if (left == 0) { int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList); - stDebug( - "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send " - "rsp to all upstream tasks", - pTask->id.idStr, numOfTasks); - - if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + if (taskLevel == TASK_LEVEL__AGG) { + stDebug( + "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data processing " + "and send rsp to all upstream tasks", + id, numOfTasks); streamAggUpstreamScanHistoryFinish(pTask); + } else { + stDebug("s-task:%s all %d upstream task(s) finish scan-history data, and rsp to all upstream tasks", id, + numOfTasks); } // all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data @@ -564,14 +572,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory if (taskLevel == TASK_LEVEL__AGG) { /*int32_t code = */streamTaskScanHistoryDataComplete(pTask); } else { // for sink task, set normal - if (pTask->status.taskStatus != TASK_STATUS__PAUSE && pTask->status.taskStatus != TASK_STATUS__STOP && - pTask->status.taskStatus != TASK_STATUS__DROPPING) { - streamSetStatusNormal(pTask); - } + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); } } else { stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", - pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left); + id, pReq->upstreamTaskId, pReq->childId, left); } return 0; @@ -582,7 +587,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; // execute in the scan history complete call back msg, ready to process data from inputQ - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED); + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); streamTaskSetSchedStatusInactive(pTask); taosWLockLatch(&pMeta->lock); @@ -619,7 +624,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) } // check if downstream tasks have been ready - streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCAN_HISTORY); + streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST); } static void tryLaunchHistoryTask(void* param, void* tmrId) { @@ -903,7 +908,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; - if (pTask->hTaskInfo.id.taskId == 0) { + if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (pTask->info.fillHistory == 1) { stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64, pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); @@ -949,7 +954,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { // normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { int64_t st = taosGetTimestampMs(); - +#if 0 int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__DROPPING) { stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); @@ -1007,6 +1012,13 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); taosWUnLockLatch(&pMeta->lock); +#endif + + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE); + + int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); + stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + // in case of fill-history task, stop the tsdb file scan operation. if (pTask->info.fillHistory == 1) { void* pExecutor = pTask->exec.pExecutor; @@ -1019,6 +1031,21 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { } void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { + char* p = NULL; + ETaskStatus status = streamTaskGetStatus(pTask, &p); + + if (status == TASK_STATUS__PAUSE) { + streamTaskRestoreStatus(pTask); + + streamTaskGetStatus(pTask, &p); + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); + stInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, + p, num); + } else { + stDebug("s-task:%s status:%s not in pause status, no need to resume", pTask->id.idStr, p); + } + +#if 0 int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__PAUSE) { pTask->status.taskStatus = pTask->status.keepTaskStatus; @@ -1031,6 +1058,8 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { } else { stError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); } +#endif + } // todo fix race condition diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4b39d979b6..b20bd9421c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -304,11 +304,9 @@ static void freeUpstreamItem(void* p) { void tFreeStreamTask(SStreamTask* pTask) { int32_t taskId = pTask->id.taskId; - char* p = NULL; - streamTaskGetStatus(pTask, &p); STaskExecStatisInfo* pStatis = &pTask->execInfo; - stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState, p); + stDebug("start to free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState); stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 @@ -394,6 +392,8 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->pRspMsgList = NULL; } + pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM); + streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo); pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index b4b4d026c2..bb7c168922 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -14,39 +14,63 @@ */ #include "streamInt.h" +#include "streamsm.h" #include "tmisce.h" #include "tstream.h" #include "ttimer.h" #include "wal.h" -#include "streamsm.h" -SStreamTaskState StreamTaskStatusList[8] = { +SStreamTaskState StreamTaskStatusList[9] = { {.state = TASK_STATUS__NORMAL, .name = "normal"}, - {.state = TASK_STATUS__DROPPING, .name = "dropping"}, + {.state = TASK_STATUS__DROPPING, .name = "dropped"}, {.state = TASK_STATUS__UNINIT, .name = "uninit"}, {.state = TASK_STATUS__STOP, .name = "stop"}, {.state = TASK_STATUS__SCAN_HISTORY, .name = "scan-history"}, {.state = TASK_STATUS__HALT, .name = "halt"}, {.state = TASK_STATUS__PAUSE, .name = "paused"}, {.state = TASK_STATUS__CK, .name = "checkpoint"}, + {.state = TASK_STATUS_STREAM_SCAN_HISTORY, .name = "stream-scan-history"}, }; -SStreamEventInfo StreamTaskEventList[8] = { - {}, // dummy event, place holder +SStreamEventInfo StreamTaskEventList[10] = { + {}, // dummy event, place holder {.event = TASK_EVENT_INIT, .name = "initialize"}, - {.event = TASK_EVENT_INIT_SCAN_HISTORY, .name = "scan-history-initialize"}, - {.event = TASK_EVENT_SCANHIST_COMPLETED, .name = "scan-history-completed"}, + {.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-initialize"}, + {.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"}, + {.event = TASK_EVENT_STOP, .name = "stopping"}, + {.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"}, + {.event = TASK_EVENT_CHECKPOINT_DONE, .name = "checkpoint-done"}, + {.event = TASK_EVENT_PAUSE, .name = "pausing"}, + {.event = TASK_EVENT_RESUME, .name = "resuming"}, + {.event = TASK_EVENT_HALT, .name = "halting"}, }; -static int32_t initStateTransferTable(SStreamTaskSM* pSM); -static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, - __state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn, - bool autoInvoke); static int32_t streamTaskInitStatus(SStreamTask* pTask); static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask); +static int32_t initStateTransferTable(SStreamTaskSM* pSM); + +static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, + __state_trans_fn fn, __state_trans_succ_fn succFn, + SAttachedEventInfo* pEventInfo, bool autoInvoke); static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; } +static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) { + char* p = NULL; + streamTaskGetStatus(pTask, &p); + + stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p, + StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name); + + SStreamTaskSM* pSM = pTask->status.pSM; + if (pSM->eventList == NULL) { + + } + + taosArrayPush(pSM->eventList, pEvtInfo); + return 0; +} + int32_t streamTaskInitStatus(SStreamTask* pTask) { pTask->execInfo.init = taosGetTimestampMs(); @@ -57,49 +81,36 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) { } int32_t streamTaskSetReadyForWal(SStreamTask* pTask) { - stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr); - streamSetStatusNormal(pTask); // todo remove it + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr); + } + + streamSetStatusNormal(pTask); // todo remove it return TSDB_CODE_SUCCESS; } -static int32_t streamTaskDoPause(SStreamTask* pTask) { - stDebug("s-task:%s start to pause tasks", pTask->id.idStr); - return 0; -} - -static int32_t streamTaskDoResume(SStreamTask* pTask) { - stDebug("s-task:%s start to resume tasks", pTask->id.idStr); - return 0; -} static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) { stDebug("s-task:%s start to do checkpoint", pTask->id.idStr); return 0; } -int32_t streamTaskWaitBeforeHalt(SStreamTask* pTask) { - char* p = NULL; - while (streamTaskGetStatus(pTask, &p) != TASK_STATUS__NORMAL) { - stDebug("related stream task:%s(status:%s) not ready for halt, wait for 100ms and retry", pTask->id.idStr, p); - taosMsleep(100); - } - - return TSDB_CODE_SUCCESS; -} - int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - if (pTask->hTaskInfo.haltVer == -1) { - pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (pTask->hTaskInfo.haltVer == -1) { + pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1; + } } + return TSDB_CODE_SUCCESS; } // todo optimize the perf of find the trans objs by using hash table static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) { int32_t numOfTrans = taosArrayGetSize(pState->pTransList); - for(int32_t i = 0; i < numOfTrans; ++i) { + for (int32_t i = 0; i < numOfTrans; ++i) { STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i); if (pTrans->state.state == pState->current.state && pTrans->event == event) { return pTrans; @@ -115,6 +126,8 @@ void streamTaskRestoreStatus(SStreamTask* pTask) { taosThreadMutexLock(&pTask->lock); ASSERT(pSM->pActiveTrans == NULL); + ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT); + SStreamTaskState state = pSM->current; pSM->current = pSM->prev; pSM->prev = state; @@ -125,46 +138,89 @@ void streamTaskRestoreStatus(SStreamTask* pTask) { } SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM)); if (pSM == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM), + tstrerror(terrno)); return NULL; } pSM->pTask = pTask; + pSM->eventList = taosArrayInit(4, sizeof(SAttachedEventInfo)); + if (pSM->eventList == NULL) { + taosMemoryFree(pSM); + + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM), + tstrerror(terrno)); + return NULL; + } // set the initial state for the state-machine of stream task pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; + pSM->startTs = taosGetTimestampMs(); int32_t code = initStateTransferTable(pSM); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pSM->eventList); taosMemoryFree(pSM); return NULL; } return pSM; } +void* streamDestroyStateMachine(SStreamTaskSM* pSM) { + if (pSM == NULL) { + return NULL; + } + + taosArrayDestroy(pSM->eventList); + taosArrayDestroy(pSM->pTransList); + taosMemoryFree(pSM); + return NULL; +} + int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { + taosThreadMutexLock(&pSM->pTask->lock); + STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event); stDebug("s-task:%s start to handle event:%s, state:%s", pSM->pTask->id.idStr, StreamTaskEventList[event].name, pSM->current.name); - int32_t code = pTrans->preAction(pSM->pTask); + if (pTrans->attachEvent.event != 0) { + attachEvent(pSM->pTask, &pTrans->attachEvent); + taosThreadMutexUnlock(&pSM->pTask->lock); - taosThreadMutexLock(&pSM->pTask->lock); - ASSERT(pSM->pActiveTrans == NULL); - pSM->pActiveTrans = pTrans; - pSM->startTs = taosGetTimestampMs(); - taosThreadMutexUnlock(&pSM->pTask->lock); + while (1) { + // wait for the task to be here + ETaskStatus s = streamTaskGetStatus(pSM->pTask, NULL); + if (s == pTrans->attachEvent.status) { + return TSDB_CODE_SUCCESS; + } else {// this event has been handled already + stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pSM->pTask->id.idStr, + StreamTaskEventList[event].name); + taosMsleep(100); + } + } - code = pTrans->pAction(pSM->pTask); - // todo handle error code; + } else { + ASSERT(pSM->pActiveTrans == NULL); + pSM->pActiveTrans = pTrans; + pSM->startTs = taosGetTimestampMs(); + taosThreadMutexUnlock(&pSM->pTask->lock); - if (pTrans->autoInvokeEndFn) { - streamTaskOnHandleEventSuccess(pSM); + int32_t code = pTrans->pAction(pSM->pTask); + // todo handle error code; + + if (pTrans->autoInvokeEndFn) { + streamTaskOnHandleEventSuccess(pSM); + } } - return code; + return TSDB_CODE_SUCCESS; } int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { @@ -181,11 +237,42 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { // on success callback, add into lock if necessary, or maybe we should add an option for this? pTrans->pSuccAction(pTask); - taosThreadMutexUnlock(&pTask->lock); - int64_t el = (taosGetTimestampMs() - pSM->startTs); - stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, - StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name); + if (taosArrayGetSize(pSM->eventList) > 0) { + int64_t el = (taosGetTimestampMs() - pSM->startTs); + stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, + StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name); + + SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->eventList); + + // OK, let's handle the attached event, since the task has reached the required status now + if (pSM->current.state == pEvtInfo->status) { + stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr, + StreamTaskEventList[pEvtInfo->event].name, pSM->current.name); + + STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event); + ASSERT(pSM->pActiveTrans == NULL); + pSM->pActiveTrans = pNextTrans; + pSM->startTs = taosGetTimestampMs(); + + taosThreadMutexUnlock(&pTask->lock); + + int32_t code = pNextTrans->pAction(pSM->pTask); + + if (pTrans->autoInvokeEndFn) { + return streamTaskOnHandleEventSuccess(pSM); + } else { + return code; + } + } + } else { + taosThreadMutexUnlock(&pTask->lock); + + int64_t el = (taosGetTimestampMs() - pSM->startTs); + stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, + StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name); + } + return TSDB_CODE_SUCCESS; } @@ -197,17 +284,19 @@ ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr) { return s.state; } -STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, - __state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn, - bool autoInvoke) { +STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, + __state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) { STaskStateTrans trans = {0}; trans.state = StreamTaskStatusList[current]; trans.next = StreamTaskStatusList[next]; trans.event = event; - trans.preAction = (preFn != NULL)? preFn:dummyFn; - trans.pAction = (fn != NULL)? fn : dummyFn; - trans.pSuccAction = (succFn != NULL)? succFn:dummyFn; + if (pEventInfo != NULL) { + trans.attachEvent = *pEventInfo; + } + + trans.pAction = (fn != NULL) ? fn : dummyFn; + trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn; trans.autoInvokeEndFn = autoInvoke; return trans; } @@ -221,49 +310,83 @@ int32_t initStateTransferTable(SStreamTaskSM* pSM) { } // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, NULL, - streamTaskInitStatus, onNormalTaskReady, false); + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, + streamTaskInitStatus, onNormalTaskReady, false, false); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCAN_HISTORY, NULL, - streamTaskInitStatus, onScanhistoryTaskReady, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, + streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_COMPLETED, NULL, - streamTaskSetReadyForWal, NULL, true); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS_STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, + streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(pSM->pTransList, &trans); - // pause & resume related event handle - trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, streamTaskDoPause, NULL, - true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE, + streamTaskSetReadyForWal, NULL, NULL, true); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__NORMAL, TASK_EVENT_RESUME, NULL, streamTaskDoResume, - NULL, true); + trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE, + streamTaskSetReadyForWal, NULL, NULL, true); taosArrayPush(pSM->pTransList, &trans); + // halt stream task, from other task status + trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, + streamTaskKeepCurrentVerInWal, NULL, true); + taosArrayPush(pSM->pTransList, &trans); + + SAttachedEventInfo info = {.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_HALT}; + trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, + streamTaskKeepCurrentVerInWal, &info, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, + &info, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, + streamTaskKeepCurrentVerInWal, NULL, true); + taosArrayPush(pSM->pTransList, &trans); + + // checkpoint related event trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = + createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); taosArrayPush(pSM->pTransList, &trans); - // halt stream task, from other task status - trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL, - streamTaskKeepCurrentVerInWal, true); + // pause & resume related event handle + trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt, - NULL, streamTaskKeepCurrentVerInWal, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt, NULL, - streamTaskKeepCurrentVerInWal, true); + info = (SAttachedEventInfo){.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_PAUSE}; + trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL, - streamTaskKeepCurrentVerInWal, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + taosArrayPush(pSM->pTransList, &trans); + + // resume is completed by restore status of state-machine + return 0; } \ No newline at end of file