refactor: remove stream-scan-history event for stream task.

This commit is contained in:
Haojun Liao 2023-12-26 00:34:04 +08:00
parent f14c3b93c1
commit 62cfcefbb0
9 changed files with 99 additions and 95 deletions

View File

@ -81,7 +81,7 @@ typedef enum ETaskStatus {
TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause TASK_STATUS__PAUSE, // pause
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS__STREAM_SCAN_HISTORY, // TASK_STATUS__STREAM_SCAN_HISTORY,
} ETaskStatus; } ETaskStatus;
enum { enum {
@ -138,15 +138,14 @@ enum {
typedef enum EStreamTaskEvent { typedef enum EStreamTaskEvent {
TASK_EVENT_INIT = 0x1, TASK_EVENT_INIT = 0x1,
TASK_EVENT_INIT_SCANHIST = 0x2, TASK_EVENT_INIT_SCANHIST = 0x2,
TASK_EVENT_INIT_STREAM_SCANHIST = 0x3, TASK_EVENT_SCANHIST_DONE = 0x3,
TASK_EVENT_SCANHIST_DONE = 0x4, TASK_EVENT_STOP = 0x4,
TASK_EVENT_STOP = 0x5, TASK_EVENT_GEN_CHECKPOINT = 0x5,
TASK_EVENT_GEN_CHECKPOINT = 0x6, TASK_EVENT_CHECKPOINT_DONE = 0x6,
TASK_EVENT_CHECKPOINT_DONE = 0x7, TASK_EVENT_PAUSE = 0x7,
TASK_EVENT_PAUSE = 0x8, TASK_EVENT_RESUME = 0x8,
TASK_EVENT_RESUME = 0x9, TASK_EVENT_HALT = 0x9,
TASK_EVENT_HALT = 0xA, TASK_EVENT_DROPPING = 0xA,
TASK_EVENT_DROPPING = 0xB,
} EStreamTaskEvent; } EStreamTaskEvent;
typedef struct { typedef struct {

View File

@ -2886,38 +2886,38 @@ static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
return NULL; return NULL;
} }
static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { //static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { // if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { // if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) {
int32_t numOfReady = 0; // int32_t numOfReady = 0;
int32_t numOfTotal = 0; // int32_t numOfTotal = 0;
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { // for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); // STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pTaskEntry->id.streamId == pId->streamId) { // if (pTaskEntry->id.streamId == pId->streamId) {
numOfTotal++; // numOfTotal++;
//
if (pTaskEntry->id.taskId != pId->taskId) { // if (pTaskEntry->id.taskId != pId->taskId) {
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); // STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->status == TASK_STATUS__READY) { // if (pEntry->status == TASK_STATUS__READY) {
numOfReady++; // numOfReady++;
} // }
} // }
} // }
} // }
//
if (numOfReady > 0) { // if (numOfReady > 0) {
mDebug("stream:0x%" PRIx64 // mDebug("stream:0x%" PRIx64
" %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task", // " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task",
pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady); // pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady);
return true; // return true;
} else { // } else {
return false; // return false;
} // }
} // }
} // }
//
return false; // return false;
} //}
// currently only handle the sink task // currently only handle the sink task
// 1. sink task, drop related fill-history task msg is missing // 1. sink task, drop related fill-history task msg is missing
@ -3091,18 +3091,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (p->status != TASK_STATUS__READY) { if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) { // if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) {
bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo); // bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo);
if (drop) { // if (drop) {
SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId); // SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId);
if (pStreamObj == NULL) { // if (pStreamObj == NULL) {
mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid); // mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid);
} else { // } else {
mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj); // mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj);
mndReleaseStream(pMnode, pStreamObj); // mndReleaseStream(pMnode, pStreamObj);
} // }
} // }
} // }
} }
} }

View File

@ -1037,15 +1037,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; ASSERT(0);
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) || streamTaskShouldStop(pTask)); // STimeWindow* pWindow = &pTask->dataRange.window;
// ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) || streamTaskShouldStop(pTask));
// Not update the fill-history time window until the state transfer is completed. //
tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64 // // Not update the fill-history time window until the state transfer is completed.
", window:%" PRId64 " - %" PRId64, // tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); // ", window:%" PRId64 " - %" PRId64,
// id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
code = streamTaskScanHistoryDataComplete(pTask); //
// code = streamTaskScanHistoryDataComplete(pTask);
} }
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
@ -1170,7 +1171,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
} else if (status == TASK_STATUS__UNINIT) { } else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ? // todo: fill-history task init ?
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event); streamTaskHandleEvent(pTask->status.pSM, event);
} }
} }
@ -1363,9 +1364,9 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL); ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__STREAM_SCAN_HISTORY) { // if (status == TASK_STATUS__STREAM_SCAN_HISTORY) {
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); // streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
} // }
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);

View File

@ -545,7 +545,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId); SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId);
if (p != NULL && restored && p->info.fillHistory == 0) { if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; EStreamTaskEvent event = /*(HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
streamTaskHandleEvent(p->status.pSM, event); streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) { } else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);

View File

@ -7989,7 +7989,7 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (interval.interval > 0) { if (interval.interval > 0) {
pStmt->pReq->lastTs = taosTimeTruncate(lastTs, &interval); pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit, interval.precision);
} else { } else {
pStmt->pReq->lastTs = lastTs; pStmt->pReq->lastTs = lastTs;
} }

View File

@ -646,7 +646,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
ETaskStatus st = streamTaskGetStatus(pTask, NULL); ETaskStatus st = streamTaskGetStatus(pTask, NULL);
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__STREAM_SCAN_HISTORY || return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__STREAM_SCAN_HISTORY*/ ||
st == TASK_STATUS__CK); st == TASK_STATUS__CK);
} }

View File

@ -1484,7 +1484,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
continue; continue;
} }
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; EStreamTaskEvent event = /*(HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, event); stError("vgId:%d failed to handle event:%d", pMeta->vgId, event);

View File

@ -52,7 +52,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) && if ((status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/) &&
pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
@ -158,7 +158,7 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
ETaskStatus status = streamTaskGetStatus(pTask, NULL); ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ASSERT(pTask->status.downstreamReady == 1 && ASSERT(pTask->status.downstreamReady == 1 &&
((status == TASK_STATUS__SCAN_HISTORY) || (status == TASK_STATUS__STREAM_SCAN_HISTORY))); ((status == TASK_STATUS__SCAN_HISTORY)/* || (status == TASK_STATUS__STREAM_SCAN_HISTORY)*/));
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
return doStartScanHistoryTask(pTask); return doStartScanHistoryTask(pTask);
@ -374,10 +374,14 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
char* p = NULL; char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/);
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p); if (pTask->info.fillHistory == 1) {
streamTaskStartScanHistory(pTask); stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", id, p);
streamTaskStartScanHistory(pTask);
} else {
stDebug("s-task:%s scan wal data, status:%s", id, p);
}
// NOTE: there will be an deadlock if launch fill history here. // NOTE: there will be an deadlock if launch fill history here.
// // start the related fill-history task, when current task is ready // // start the related fill-history task, when current task is ready
@ -391,7 +395,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
void doProcessDownstreamReadyRsp(SStreamTask* pTask) { void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event; EStreamTaskEvent event;
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
} else { } else {
event = TASK_EVENT_INIT_SCANHIST; event = TASK_EVENT_INIT_SCANHIST;
} }
@ -631,7 +635,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
char* p = NULL; char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status != TASK_STATUS__SCAN_HISTORY && status != TASK_STATUS__STREAM_SCAN_HISTORY) { if (status != TASK_STATUS__SCAN_HISTORY /*&& status != TASK_STATUS__STREAM_SCAN_HISTORY*/) {
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p, stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p,
pReq->upstreamTaskId); pReq->upstreamTaskId);
@ -693,7 +697,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/);
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ // execute in the scan history complete call back msg, ready to process data from inputQ

View File

@ -31,14 +31,14 @@ SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__HALT, .name = "halt"}, {.state = TASK_STATUS__HALT, .name = "halt"},
{.state = TASK_STATUS__PAUSE, .name = "paused"}, {.state = TASK_STATUS__PAUSE, .name = "paused"},
{.state = TASK_STATUS__CK, .name = "checkpoint"}, {.state = TASK_STATUS__CK, .name = "checkpoint"},
{.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"}, // {.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
}; };
SStreamEventInfo StreamTaskEventList[12] = { SStreamEventInfo StreamTaskEventList[12] = {
{.event = 0, .name = ""}, // dummy event, place holder {.event = 0, .name = ""}, // dummy event, place holder
{.event = TASK_EVENT_INIT, .name = "initialize"}, {.event = TASK_EVENT_INIT, .name = "initialize"},
{.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-init"}, {.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-init"},
{.event = TASK_EVENT_INIT_STREAM_SCANHIST, .name = "stream-scan-history-init"}, // {.event = TASK_EVENT_INIT_STREAM_SCANHIST, .name = "stream-scan-history-init"},
{.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"}, {.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"},
{.event = TASK_EVENT_STOP, .name = "stopping"}, {.event = TASK_EVENT_STOP, .name = "stopping"},
{.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"}, {.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"},
@ -110,12 +110,12 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE // todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) { static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) {
if (event == TASK_EVENT_INIT_STREAM_SCANHIST || event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) { if (/*event == TASK_EVENT_INIT_STREAM_SCANHIST || */event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) {
return (state != TASK_STATUS__UNINIT); return (state != TASK_STATUS__UNINIT);
} }
if (event == TASK_EVENT_SCANHIST_DONE) { if (event == TASK_EVENT_SCANHIST_DONE) {
return (state != TASK_STATUS__SCAN_HISTORY && state != TASK_STATUS__STREAM_SCAN_HISTORY); return (state != TASK_STATUS__SCAN_HISTORY/* && state != TASK_STATUS__STREAM_SCAN_HISTORY*/);
} }
if (event == TASK_EVENT_GEN_CHECKPOINT) { if (event == TASK_EVENT_GEN_CHECKPOINT) {
@ -482,14 +482,14 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); // trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event // scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
// halt stream task, from other task status // halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
@ -499,8 +499,8 @@ void doInitStateTransferTable(void) {
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
@ -519,8 +519,8 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
@ -554,8 +554,8 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
// dropping related event // dropping related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
@ -574,7 +574,7 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
} }
//clang-format on //clang-format on