refactor: do some internal refactor.
This commit is contained in:
parent
d5b316839d
commit
7e866c5527
|
@ -1039,14 +1039,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
// code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
|
||||
// if (code == TSDB_CODE_SUCCESS) {
|
||||
// doStartFillhistoryStep2(pTask, pStreamTask, pTq);
|
||||
// } else {
|
||||
// tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
|
||||
// }
|
||||
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
|
||||
|
|
|
@ -669,6 +669,13 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* i
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
streamTaskSendCheckpointSourceRsp(pTask);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||
SStreamTask* pTask = NULL;
|
||||
|
||||
|
@ -687,7 +694,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
}
|
||||
|
||||
// handle the dropping event
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING);
|
||||
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
||||
} else {
|
||||
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
|
|
@ -59,15 +59,14 @@ static int32_t streamTaskInitStatus(SStreamTask* pTask);
|
|||
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
|
||||
static int32_t initStateTransferTable();
|
||||
static void doInitStateTransferTable(void);
|
||||
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask);
|
||||
|
||||
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
||||
__state_trans_fn fn, __state_trans_succ_fn succFn,
|
||||
SFutureHandleEventInfo* pEventInfo, bool autoInvoke);
|
||||
SFutureHandleEventInfo* pEventInfo);
|
||||
|
||||
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
|
||||
|
||||
static int32_t attachNextHandledEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) {
|
||||
static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) {
|
||||
char* p = streamTaskGetStatus(pTask)->name;
|
||||
|
||||
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
||||
|
@ -89,13 +88,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) {
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
streamTaskSendCheckpointSourceRsp(pTask);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
||||
if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr);
|
||||
|
@ -274,7 +266,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
|||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTrans->attachEvent.event != 0) {
|
||||
attachNextHandledEvent(pTask, &pTrans->attachEvent);
|
||||
attachWaitedEvent(pTask, &pTrans->attachEvent);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
while (1) {
|
||||
|
@ -313,34 +305,13 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
|||
|
||||
static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) {
|
||||
SStreamTask* pTask = pSM->pTask;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTrans->attachEvent.event != 0) {
|
||||
SFutureHandleEventInfo info = pTrans->attachEvent;
|
||||
info.pParam = param;
|
||||
info.callBackFn = callbackFn;
|
||||
|
||||
attachNextHandledEvent(pTask, &info);
|
||||
attachWaitedEvent(pTask, &info);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
// while (1) {
|
||||
// // wait for the task to be here
|
||||
// taosThreadMutexLock(&pTask->lock);
|
||||
// ETaskStatus s = streamTaskGetStatus(pTask)->state;
|
||||
// taosThreadMutexUnlock(&pTask->lock);
|
||||
//
|
||||
// if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already
|
||||
// stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event));
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT) {
|
||||
// stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event));
|
||||
// taosMsleep(100);
|
||||
// } else {
|
||||
// stDebug("s-task:%s is dropped or stopped already, not wait.", id);
|
||||
// return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||
// }
|
||||
// }
|
||||
|
||||
} else { // override current active trans
|
||||
pSM->pActiveTrans = pTrans;
|
||||
pSM->startTs = taosGetTimestampMs();
|
||||
|
@ -415,13 +386,13 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _
|
|||
// no active event trans exists, handle this event directly
|
||||
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
||||
if (pTrans == NULL) {
|
||||
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
|
||||
stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||
}
|
||||
|
||||
if (pSM->pActiveTrans != NULL) {
|
||||
// currently in some state transfer procedure, not auto invoke transfer, quit from this prcedure
|
||||
// currently in some state transfer procedure, not auto invoke transfer, quit from this procedure
|
||||
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
||||
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
||||
pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
|
||||
|
@ -555,7 +526,7 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
|
||||
__state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo, bool autoInvoke) {
|
||||
__state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo) {
|
||||
STaskStateTrans trans = {0};
|
||||
trans.state = StreamTaskStatusList[current];
|
||||
trans.next = StreamTaskStatusList[next];
|
||||
|
@ -570,7 +541,7 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr
|
|||
|
||||
trans.pAction = (fn != NULL) ? fn : dummyFn;
|
||||
trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn;
|
||||
trans.autoInvokeEndFn = autoInvoke;
|
||||
trans.autoInvokeEndFn = (fn == NULL);
|
||||
return trans;
|
||||
}
|
||||
|
||||
|
@ -584,93 +555,93 @@ void doInitStateTransferTable(void) {
|
|||
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
|
||||
|
||||
// initialization event handle
|
||||
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false);
|
||||
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
// scan-history related event
|
||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
// halt stream task, from other task status
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
||||
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
// checkpoint related event
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
// pause & resume related event handle
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
// resume is completed by restore status of state-machine
|
||||
|
||||
// stop related event
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
// dropping related event
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true);
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
}
|
||||
//clang-format on
|
Loading…
Reference in New Issue