enh(stream): handle event async function.
This commit is contained in:
parent
fc218a1f69
commit
472020a2b9
|
@ -799,7 +799,10 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
|||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock);
|
||||
|
||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||
|
||||
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
|
||||
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
|
||||
void streamTaskRestoreStatus(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskStop(SStreamTask* pTask);
|
||||
|
|
|
@ -916,6 +916,22 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
|||
}
|
||||
}
|
||||
|
||||
int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
|
||||
STQ* pTq = param;
|
||||
|
||||
SStreamMeta* pMeta = pStreamTask->pMeta;
|
||||
STaskId hId = pStreamTask->hTaskInfo.id;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId);
|
||||
if (pTask == NULL) {
|
||||
// todo handle error
|
||||
}
|
||||
|
||||
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// this function should be executed by only one thread, so we set an sentinel to protect this function
|
||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
||||
|
@ -988,7 +1004,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
|
||||
} else {
|
||||
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
||||
ETaskStatus s = p->state;
|
||||
ETaskStatus s = p->state;
|
||||
|
||||
if (s == TASK_STATUS__PAUSE) {
|
||||
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
|
||||
|
@ -1006,37 +1022,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
// the following procedure should be executed, no matter status is stop/pause or not
|
||||
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
|
||||
|
||||
if (pTask->info.fillHistory) {
|
||||
SStreamTask* pStreamTask = NULL;
|
||||
ASSERT(pTask->info.fillHistory == 1);
|
||||
|
||||
// 1. get the related stream task
|
||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||
if (pStreamTask == NULL) {
|
||||
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
|
||||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||
// 1. get the related stream task
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||
if (pStreamTask == NULL) {
|
||||
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
|
||||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||
|
||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||
|
||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
|
||||
} else {
|
||||
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
// code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
|
||||
// if (code == TSDB_CODE_SUCCESS) {
|
||||
// doStartFillhistoryStep2(pTask, pStreamTask, pTq);
|
||||
// } else {
|
||||
// tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
|
||||
// }
|
||||
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
|
||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
|
|
|
@ -26,21 +26,21 @@ extern "C" {
|
|||
typedef int32_t (*__state_trans_fn)(SStreamTask*);
|
||||
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
|
||||
|
||||
typedef struct SAttachedEventInfo {
|
||||
typedef struct SFutureHandleEventInfo {
|
||||
ETaskStatus status; // required status that this event can be handled
|
||||
EStreamTaskEvent event; // the delayed handled event
|
||||
void* pParam;
|
||||
void* pFn;
|
||||
} SAttachedEventInfo;
|
||||
__state_trans_user_fn callBackFn;
|
||||
} SFutureHandleEventInfo;
|
||||
|
||||
typedef struct STaskStateTrans {
|
||||
bool autoInvokeEndFn;
|
||||
SStreamTaskState state;
|
||||
EStreamTaskEvent event;
|
||||
SStreamTaskState next;
|
||||
__state_trans_fn pAction;
|
||||
__state_trans_succ_fn pSuccAction;
|
||||
SAttachedEventInfo attachEvent;
|
||||
bool autoInvokeEndFn;
|
||||
SStreamTaskState state;
|
||||
EStreamTaskEvent event;
|
||||
SStreamTaskState next;
|
||||
__state_trans_fn pAction;
|
||||
__state_trans_succ_fn pSuccAction;
|
||||
SFutureHandleEventInfo attachEvent;
|
||||
} STaskStateTrans;
|
||||
|
||||
struct SStreamTaskSM {
|
||||
|
|
|
@ -385,7 +385,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
|
|||
|
||||
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
|
||||
|
||||
int64_t initTs = pTask->execInfo.init;
|
||||
int64_t startTs = pTask->execInfo.start;
|
||||
|
|
|
@ -63,16 +63,20 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask);
|
|||
|
||||
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
||||
__state_trans_fn fn, __state_trans_succ_fn succFn,
|
||||
SAttachedEventInfo* pEventInfo, bool autoInvoke);
|
||||
SFutureHandleEventInfo* pEventInfo, bool autoInvoke);
|
||||
|
||||
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
|
||||
|
||||
static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
|
||||
static int32_t attachNextHandledEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) {
|
||||
char* p = streamTaskGetStatus(pTask)->name;
|
||||
|
||||
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
||||
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
|
||||
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
|
||||
|
||||
SArray* pList = pTask->status.pSM->pWaitingEventList;
|
||||
taosArrayPush(pList, pEvtInfo);
|
||||
|
||||
stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pList));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -170,9 +174,11 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
|||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||
pEventName, el, pSM->prev.state.name, pSM->current.name);
|
||||
|
||||
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
||||
ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1);
|
||||
|
||||
// OK, let's handle the attached event, since the task has reached the required status now
|
||||
SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
||||
|
||||
// OK, let's handle the waiting event, since the task has reached the required status now
|
||||
if (pSM->current.state == pEvtInfo->status) {
|
||||
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
|
||||
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
|
||||
|
@ -189,7 +195,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
|||
|
||||
code = pNextTrans->pAction(pSM->pTask);
|
||||
if (pNextTrans->autoInvokeEndFn) {
|
||||
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event);
|
||||
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event, pEvtInfo->callBackFn, pEvtInfo->pParam);
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
|
@ -242,7 +248,7 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
pSM->pTask = pTask;
|
||||
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SAttachedEventInfo));
|
||||
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SFutureHandleEventInfo));
|
||||
if (pSM->pWaitingEventList == NULL) {
|
||||
taosMemoryFree(pSM);
|
||||
|
||||
|
@ -273,7 +279,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
|||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTrans->attachEvent.event != 0) {
|
||||
attachEvent(pTask, &pTrans->attachEvent);
|
||||
attachNextHandledEvent(pTask, &pTrans->attachEvent);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
while (1) {
|
||||
|
@ -303,7 +309,53 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
|||
// todo handle error code;
|
||||
|
||||
if (pTrans->autoInvokeEndFn) {
|
||||
streamTaskOnHandleEventSuccess(pSM, event);
|
||||
streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) {
|
||||
SStreamTask* pTask = pSM->pTask;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTrans->attachEvent.event != 0) {
|
||||
SFutureHandleEventInfo info = pTrans->attachEvent;
|
||||
info.pParam = param;
|
||||
info.callBackFn = callbackFn;
|
||||
|
||||
attachNextHandledEvent(pTask, &info);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
// while (1) {
|
||||
// // wait for the task to be here
|
||||
// taosThreadMutexLock(&pTask->lock);
|
||||
// ETaskStatus s = streamTaskGetStatus(pTask)->state;
|
||||
// taosThreadMutexUnlock(&pTask->lock);
|
||||
//
|
||||
// if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already
|
||||
// stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event));
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT) {
|
||||
// stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event));
|
||||
// taosMsleep(100);
|
||||
// } else {
|
||||
// stDebug("s-task:%s is dropped or stopped already, not wait.", id);
|
||||
// return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||
// }
|
||||
// }
|
||||
|
||||
} else { // override current active trans
|
||||
pSM->pActiveTrans = pTrans;
|
||||
pSM->startTs = taosGetTimestampMs();
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
int32_t code = pTrans->pAction(pTask);
|
||||
// todo handle error code;
|
||||
|
||||
if (pTrans->autoInvokeEndFn) {
|
||||
streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -349,6 +401,46 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SStreamTask* pTask = pSM->pTask;
|
||||
STaskStateTrans* pTrans = NULL;
|
||||
|
||||
while (1) {
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
||||
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
|
||||
EStreamTaskEvent evt = pSM->pActiveTrans->event;
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
|
||||
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
|
||||
ASSERT(0);
|
||||
taosMsleep(100);
|
||||
} else {
|
||||
// no active event trans exists, handle this event directly
|
||||
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
||||
if (pTrans == NULL) {
|
||||
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||
}
|
||||
|
||||
if (pSM->pActiveTrans != NULL) {
|
||||
// currently in some state transfer procedure, not auto invoke transfer, quit from this prcedure
|
||||
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
||||
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
||||
pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
|
||||
}
|
||||
|
||||
code = doHandleEventAsync(pSM, event, pTrans, callbackFn, param);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static void keepPrevInfo(SStreamTaskSM* pSM) {
|
||||
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
||||
|
||||
|
@ -356,8 +448,9 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
|
|||
pSM->prev.evt = pTrans->event;
|
||||
}
|
||||
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
|
||||
SStreamTask* pTask = pSM->pTask;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
// do update the task status
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
@ -369,16 +462,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
|||
s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY);
|
||||
|
||||
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
||||
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
|
||||
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
||||
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event),
|
||||
pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||
}
|
||||
|
||||
if (pTrans->event != event) {
|
||||
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
||||
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
|
||||
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", id, GET_EVT_NAME(event),
|
||||
pSM->current.name, GET_EVT_NAME(pTrans->event));
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||
}
|
||||
|
@ -388,16 +481,26 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
|||
pSM->current = pTrans->next;
|
||||
pSM->pActiveTrans = NULL;
|
||||
|
||||
// todo remove it
|
||||
// on success callback, add into lock if necessary, or maybe we should add an option for this?
|
||||
pTrans->pSuccAction(pTask);
|
||||
|
||||
// after handling the callback function assigned by invoker, go on handling the waiting tasks
|
||||
if (callbackFn != NULL) {
|
||||
stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event));
|
||||
callbackFn(pSM->pTask, param);
|
||||
|
||||
stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event));
|
||||
}
|
||||
|
||||
// tasks in waiting list
|
||||
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
||||
doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);
|
||||
} else {
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", id,
|
||||
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
|
||||
}
|
||||
|
||||
|
@ -453,7 +556,7 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
|
||||
__state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) {
|
||||
__state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo, bool autoInvoke) {
|
||||
STaskStateTrans trans = {0};
|
||||
trans.state = StreamTaskStatusList[current];
|
||||
trans.next = StreamTaskStatusList[next];
|
||||
|
@ -497,7 +600,7 @@ void doInitStateTransferTable(void) {
|
|||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
||||
SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
||||
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
@ -518,7 +621,7 @@ void doInitStateTransferTable(void) {
|
|||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
|
||||
info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
|
||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
||||
|
|
Loading…
Reference in New Issue