refactor(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-19 15:09:47 +08:00
parent f4caeca24a
commit 402aefd95d
16 changed files with 213 additions and 187 deletions

View File

@ -67,7 +67,7 @@ enum {
};
typedef enum ETaskStatus {
TASK_STATUS__NORMAL = 0,
TASK_STATUS__READY = 0,
TASK_STATUS__DROPPING,
TASK_STATUS__UNINIT, // not used, an placeholder
TASK_STATUS__STOP,
@ -139,7 +139,7 @@ typedef enum EStreamTaskEvent {
TASK_EVENT_PAUSE = 0x8,
TASK_EVENT_RESUME = 0x9,
TASK_EVENT_HALT = 0xA,
TASK_EVENT_TRANS_STATE = 0xB,
TASK_EVENT_DROPPING = 0xB,
TASK_EVENT_SCAN_TSDB = 0xC,
TASK_EVENT_SCAN_WAL = 0xD,
} EStreamTaskEvent;
@ -714,12 +714,14 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamExecTask(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskShouldStop(const SStreamTask* pStatus);
bool streamTaskShouldPause(const SStreamTask* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr);
ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr);
void streamTaskResetStatus(SStreamTask* pTask);
void streamTaskSetStatusReady(SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
@ -753,7 +755,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
int32_t streamSetStatusUnint(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);

View File

@ -244,7 +244,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 300;
int32_t tsStreamCheckpointInterval = 60;
float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 30;
int32_t tsTtlUnit = 86400;

View File

@ -1170,7 +1170,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
}
{ // check if all tasks are in TASK_STATUS__NORMAL status
{ // check if all tasks are in TASK_STATUS__READY status
bool ready = true;
taosThreadMutexLock(&execNodeList.lock);
@ -1181,7 +1181,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
continue;
}
if (pEntry->status != TASK_STATUS__NORMAL) {
if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status));
ready = false;
@ -2614,7 +2614,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
pEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) {
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
}
}

View File

@ -159,7 +159,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqCheckAndRunStreamTask(STQ* pTq);
int32_t tqStartStreamTask(STQ* pTq);
int32_t tqStartStreamTasks(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq);

View File

@ -847,11 +847,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
}
// reset the task status from unfinished transaction
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__NORMAL;
}
// // reset the task status from unfinished transaction
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr);
// pTask->status.taskStatus = TASK_STATUS__READY;
// }
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
@ -1029,7 +1029,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
bool restored = pTq->pVnode->restored;
if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST;
streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
@ -1118,7 +1118,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamScanHistoryData(pTask);
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE) {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
@ -1228,7 +1228,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamTaskRestoreStatus(pTask);
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
// pTask->status.keepTaskStatus = TASK_STATUS__READY;
// qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id,
// streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
// }
@ -1352,7 +1352,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) {
tqCheckAndRunStreamTask(pTq);
tqStartStreamTask(pTq);
return 0;
}
@ -1365,7 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
char* p = NULL;
ETaskStatus st = streamTaskGetStatus(pTask, &p);
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
if (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
streamExecTask(pTask);
@ -1514,7 +1514,6 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
return -1;
}
// todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal
streamTaskResume(pTask, pTq->pStreamMeta);
int32_t level = pTask->info.taskLevel;
@ -1523,8 +1522,8 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
return 0;
}
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
// no lock needs to secure the access of the version
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended.
@ -1537,8 +1536,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
}
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory &&
pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0)) {
tqScanWalAsync(pTq, false);
@ -1867,7 +1865,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive nodeEp update msg from mnode", pTask->id.idStr);
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamSetStatusNormal(pTask);
streamTaskResetStatus(pTask);
SStreamTask** ppHTask = NULL;
if (pTask->hTaskInfo.id.taskId != 0) {
@ -1971,10 +1969,10 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (pTask->status.taskStatus == TASK_STATUS__CK) {
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
streamTaskClearCheckInfo(pTask);
taosArrayClear(pTask->pReadyMsgList);
streamSetStatusNormal(pTask);
streamTaskSetStatusReady(pTask);
}
streamMetaReleaseTask(pMeta, pTask);

View File

@ -592,7 +592,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
const char* id = pTask->id.idStr;
while (pTableSinkInfo->uid == 0) {
if (streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldStop(pTask)) {
tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}
@ -773,7 +773,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
numOfBlocks);
for(int32_t i = 0; i < numOfBlocks; ++i) {
if (streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldStop(pTask)) {
return;
}
@ -823,7 +823,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
bool hasSubmit = false;
for (int32_t i = 0; i < numOfBlocks; i++) {
if (streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldStop(pTask)) {
taosHashCleanup(pTableIndexMap);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return;

View File

@ -60,7 +60,7 @@ int32_t tqScanWal(STQ* pTq) {
return 0;
}
int32_t tqCheckAndRunStreamTask(STQ* pTq) {
int32_t tqStartStreamTask(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
@ -99,7 +99,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
continue;
}
EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST;
streamTaskHandleEvent(pTask->status.pSM, event);
streamMetaReleaseTask(pMeta, pTask);
}
@ -240,9 +240,8 @@ int32_t tqStartStreamTasks(STQ* pTq) {
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
int8_t status = (*pTask)->status.taskStatus;
if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) {
streamSetStatusNormal(*pTask);
if ((*pTask)->info.fillHistory != 1) {
streamTaskResetStatus(*pTask);
}
}
@ -327,14 +326,14 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
// int32_t status = pTask->status.taskStatus;
char* p = NULL;
int32_t status = streamTaskGetStatus(pTask, &p);
if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__NORMAL) {
if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) {
tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, p);
return false;
}
// fill-history task has entered into the last phase, no need to anything
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
ASSERT(status == TASK_STATUS__NORMAL);
ASSERT(status == TASK_STATUS__READY);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer);
@ -449,7 +448,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status != TASK_STATUS__NORMAL) {
if (status != TASK_STATUS__READY) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, p);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask);

View File

@ -48,7 +48,7 @@ typedef struct STaskStateTrans {
struct SStreamTaskSM {
SStreamTask* pTask;
SArray* pTransList; // SArray<STaskStateTrans>
// SArray* pTransList; // SArray<STaskStateTrans>
STaskStateTrans* pActiveTrans;
int64_t startTs;
SStreamTaskState current;

View File

@ -64,7 +64,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
int8_t status = atomic_load_8(&pTask->schedInfo.status);
stDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
stDebug("s-task:%s jump out of schedTimer", id);
return;
}

View File

@ -420,7 +420,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
const char* id = pTask->id.idStr;
int32_t msgId = pTask->execInfo.dispatch;
if (streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
return;
@ -474,10 +474,10 @@ static void doRetryDispatchData(void* param, void* tmrId) {
}
if (code != TSDB_CODE_SUCCESS) {
if (!streamTaskShouldStop(&pTask->status)) {
if (!streamTaskShouldStop(pTask)) {
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
if (streamTaskShouldPause(&pTask->status)) {
if (streamTaskShouldPause(pTask)) {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
} else {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);

View File

@ -22,14 +22,13 @@
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
bool streamTaskShouldStop(const SStreamTask* pTask) {
ETaskStatus s = streamTaskGetStatus(pTask, NULL);
return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING);
}
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__PAUSE);
bool streamTaskShouldPause(const SStreamTask* pTask) {
return (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE);
}
static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
@ -102,7 +101,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
pRes = taosArrayInit(4, sizeof(SSDataBlock));
}
if (streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
}
@ -198,7 +197,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
qSetStreamOpOpen(exec);
while (!finished) {
if (streamTaskShouldPause(&pTask->status)) {
if (streamTaskShouldPause(pTask)) {
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
break;
@ -213,7 +212,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
int32_t size = 0;
int32_t numOfBlocks = 0;
while (1) {
if (streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
}
@ -309,20 +308,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr);
}
ASSERT(((pStreamTask->status.taskStatus == TASK_STATUS__STOP) ||
(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) &&
int8_t status = streamTaskGetStatus(pStreamTask, NULL);
ASSERT(((status == TASK_STATUS__DROPPING) || (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) &&
pTask->status.appendTranstateBlock == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// todo. the dropping status should be append to the status after the halt completed.
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
// for the step 2.
int8_t status = streamTaskGetStatus(pStreamTask, NULL);//pStreamTask->status.taskStatus;
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING);
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
} else {
ASSERT(status == TASK_STATUS__NORMAL);
ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
}
@ -333,13 +330,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// In case of sink tasks, no need to halt them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure.
// When a task is idle with halt status, all data in inputQ are consumed.
char* p = NULL;
streamTaskGetStatus(pStreamTask, &p);
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task.
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
", status:normal, sched-status:%d",
", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, pStreamTask->status.schedStatus);
pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
} else {
stDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
}
@ -366,6 +364,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 6. save to disk
taosWLockLatch(&pMeta->lock);
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
@ -525,7 +525,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int32_t blockSize = 0;
int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s stream task is stopped", id);
break;
}
@ -605,8 +605,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
bool streamTaskIsIdle(const SStreamTask* pTask) {
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || pTask->status.taskStatus == TASK_STATUS__STOP ||
pTask->status.taskStatus == TASK_STATUS__DROPPING);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP ||
status == TASK_STATUS__DROPPING);
}
int32_t streamExecTask(SStreamTask* pTask) {
@ -623,8 +624,8 @@ int32_t streamExecTask(SStreamTask* pTask) {
}
taosThreadMutexLock(&pTask->lock);
if ((streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0) || streamTaskShouldStop(&pTask->status) ||
streamTaskShouldPause(&pTask->status)) {
if ((streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0) || streamTaskShouldStop(pTask) ||
streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosThreadMutexUnlock(&pTask->lock);

View File

@ -461,7 +461,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
if (!streamTaskShouldStop(&(*ppTask)->status)) {
if (!streamTaskShouldStop(*ppTask)) {
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
@ -478,7 +478,7 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask)
if (ref > 0) {
stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
} else if (ref == 0) {
ASSERT(streamTaskShouldStop(&pTask->status));
ASSERT(streamTaskShouldStop(pTask));
stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
tFreeStreamTask(pTask);
} else if (ref < 0) {
@ -506,11 +506,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
if (streamTaskShouldPause(&pTask->status)) {
// desc the paused task counter
if (streamTaskShouldPause(pTask)) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
}
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
// handle the dropping event
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING);
} else {
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
taosWUnLockLatch(&pMeta->lock);
@ -522,8 +526,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
while (1) {
taosRLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
if ((*ppTask)->status.timerActive == 0) {
taosRUnLockLatch(&pMeta->lock);
@ -548,15 +552,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId};
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId));
if (ppStreamTask != NULL) {
(*ppStreamTask)->hTaskInfo.id.taskId = 0;
(*ppStreamTask)->hTaskInfo.id.streamId = 0;
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
}
} else {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
}
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
ASSERT(pTask->status.timerActive == 0);
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
@ -702,8 +704,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask);
STaskId id = streamTaskExtractKey(pTask);
taosArrayPush(pRecycleList, &id);
int32_t total = taosArrayGetSize(pRecycleList);
stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
continue;
@ -739,7 +741,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
}
if (streamTaskShouldPause(&pTask->status)) {
if (streamTaskShouldPause(pTask)) {
atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
}

View File

@ -165,7 +165,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
}
while (1) {
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) {
stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
@ -346,7 +346,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
STaosQueue* pQueue = pTask->outputq.queue->pQueue;
while (streamQueueIsFull(pTask->outputq.queue)) {
if (streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldStop(pTask)) {
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}

View File

@ -263,14 +263,14 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__NORMAL);
ASSERT(status == TASK_STATUS__READY);
// todo refactor: remove this later
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
ASSERT(pTask->hTaskInfo.id.taskId == 0);
}
// if (pTask->info.fillHistory == 1) {
// stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
// pTask->status.taskStatus = TASK_STATUS__DROPPING;
// ASSERT(pTask->hTaskInfo.id.taskId == 0);
// }
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64,
@ -315,7 +315,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
int8_t status = pTask->status.taskStatus;
const char* str = streamGetTaskStatusStr(status);
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL);
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__READY);
streamTaskSetRangeStreamCalc(pTask);
if (status == TASK_STATUS__SCAN_HISTORY) {
@ -341,7 +341,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr;
if (streamTaskShouldStop(&pTask->status)) {
if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, do not do check downstream again", id);
return TSDB_CODE_SUCCESS;
}
@ -447,18 +447,6 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
}
int32_t streamSetStatusNormal(SStreamTask* pTask) {
int32_t status = atomic_load_8(&pTask->status.taskStatus);
if (status == TASK_STATUS__DROPPING) {
stError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
return -1;
} else {
stDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
return 0;
}
}
// source
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
@ -636,7 +624,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
if (ppTask) {
ASSERT((*ppTask)->status.timerActive >= 1);
if (streamTaskShouldStop(&(*ppTask)->status)) {
if (streamTaskShouldStop(*ppTask)) {
char* p = NULL;
streamTaskGetStatus((*ppTask), &p);
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
@ -677,7 +665,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
if (pHTask == NULL && (!streamTaskShouldStop(pTask))) {
char* p = NULL;
int32_t hTaskId = pHTaskInfo->id.taskId;
@ -776,7 +764,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
}
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__DROPPING) {
return 0;
}
@ -1049,7 +1037,7 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__PAUSE) {
pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
pTask->status.keepTaskStatus = TASK_STATUS__READY;
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
@ -1065,14 +1053,14 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
// todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) {
// pre-condition check
const char* id = pTask->id.idStr;
while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
stDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
taosMsleep(100);
}
stDebug("s-task:%s disable task pause", id);
pTask->status.pauseAllowed = 0;
// const char* id = pTask->id.idStr;
// while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// stDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
// taosMsleep(100);
// }
//
// stDebug("s-task:%s disable task pause", id);
// pTask->status.pauseAllowed = 0;
}
void streamTaskEnablePause(SStreamTask* pTask) {
@ -1092,7 +1080,7 @@ void streamTaskResumeFromHalt(SStreamTask* pTask) {
// }
// pTask->status.taskStatus = pTask->status.keepTaskStatus;
// pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
// pTask->status.keepTaskStatus = TASK_STATUS__READY;
streamTaskRestoreStatus(pTask);
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s resume from halt, current status:%s", id, p);

View File

@ -58,7 +58,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
pTask->id.idStr = taosStrdup(buf);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__NORMAL;
pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
@ -581,13 +581,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
int64_t st = taosGetTimestampMs();
const char* id = pTask->id.idStr;
taosThreadMutexLock(&pTask->lock);
if (pTask->status.taskStatus == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint will be discarded since task is stopped", id);
}
pTask->status.taskStatus = TASK_STATUS__STOP;
taosThreadMutexUnlock(&pTask->lock);
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {
stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
@ -740,7 +734,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
const char* streamGetTaskStatusStr(int32_t status) {
switch(status) {
case TASK_STATUS__NORMAL: return "normal";
case TASK_STATUS__READY: return "normal";
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused";

View File

@ -21,7 +21,7 @@
#include "wal.h"
SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__NORMAL, .name = "normal"},
{.state = TASK_STATUS__READY, .name = "ready"},
{.state = TASK_STATUS__DROPPING, .name = "dropped"},
{.state = TASK_STATUS__UNINIT, .name = "uninit"},
{.state = TASK_STATUS__STOP, .name = "stop"},
@ -45,9 +45,13 @@ SStreamEventInfo StreamTaskEventList[10] = {
{.event = TASK_EVENT_HALT, .name = "halting"},
};
static TdThreadOnce streamTaskStateMachineInit = PTHREAD_ONCE_INIT;
static SArray* streamTaskSMTrans = NULL;
static int32_t streamTaskInitStatus(SStreamTask* pTask);
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
static int32_t initStateTransferTable(SStreamTaskSM* pSM);
static int32_t initStateTransferTable();
static void doInitStateTransferTable(void);
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
__state_trans_fn fn, __state_trans_succ_fn succFn,
@ -61,13 +65,7 @@ static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name);
SStreamTaskSM* pSM = pTask->status.pSM;
if (pSM->eventList == NULL) {
}
taosArrayPush(pSM->eventList, pEvtInfo);
taosArrayPush(pTask->status.pSM->eventList, pEvtInfo);
return 0;
}
@ -84,8 +82,6 @@ int32_t streamTaskSetReadyForWal(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr);
}
streamSetStatusNormal(pTask); // todo remove it
return TSDB_CODE_SUCCESS;
}
@ -109,9 +105,9 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
// todo optimize the perf of find the trans objs by using hash table
static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) {
int32_t numOfTrans = taosArrayGetSize(pState->pTransList);
int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
for (int32_t i = 0; i < numOfTrans; ++i) {
STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i);
STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i);
if (pTrans->state.state == pState->current.state && pTrans->event == event) {
return pTrans;
}
@ -138,6 +134,7 @@ void streamTaskRestoreStatus(SStreamTask* pTask) {
}
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
initStateTransferTable();
const char* id = pTask->id.idStr;
SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM));
@ -161,14 +158,7 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
// set the initial state for the state-machine of stream task
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->startTs = taosGetTimestampMs();
int32_t code = initStateTransferTable(pSM);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pSM->eventList);
taosMemoryFree(pSM);
return NULL;
}
return pSM;
}
@ -178,7 +168,6 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) {
}
taosArrayDestroy(pSM->eventList);
taosArrayDestroy(pSM->pTransList);
taosMemoryFree(pSM);
return NULL;
}
@ -276,7 +265,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
return TSDB_CODE_SUCCESS;
}
ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr) {
ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr) {
SStreamTaskState s = pTask->status.pSM->current; // copy one obj in case of multi-thread environment
if (pStr != NULL) {
*pStr = s.name;
@ -284,6 +273,28 @@ ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr) {
return s.state;
}
void streamTaskResetStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->pActiveTrans = NULL;
taosArrayClear(pSM->eventList);
}
void streamTaskSetStatusReady(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
if (pSM->current.state == TASK_STATUS__DROPPING) {
stError("s-task:%s task in dropping state, cannot be set ready", pTask->id.idStr);
return;
}
pSM->prev = pSM->current;
pSM->current = StreamTaskStatusList[TASK_STATUS__READY];
pSM->startTs = taosGetTimestampMs();
pSM->pActiveTrans = NULL;
taosArrayClear(pSM->eventList);
}
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
__state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) {
STaskStateTrans trans = {0};
@ -301,92 +312,124 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr
return trans;
}
int32_t initStateTransferTable(SStreamTaskSM* pSM) {
if (pSM->pTransList == NULL) {
pSM->pTransList = taosArrayInit(8, sizeof(STaskStateTrans));
if (pSM->pTransList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
int32_t initStateTransferTable() {
taosThreadOnce(&streamTaskStateMachineInit, doInitStateTransferTable);
return TSDB_CODE_SUCCESS;
}
void doInitStateTransferTable(void) {
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT,
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT,
streamTaskInitStatus, onNormalTaskReady, false, false);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS_STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE,
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE,
streamTaskSetReadyForWal, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE,
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE,
streamTaskSetReadyForWal, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
// halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
SAttachedEventInfo info = {.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_HALT};
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal,
&info, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
// checkpoint related event
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL,
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL,
streamTaskDoCheckpoint, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans =
createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// pause & resume related event handle
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
info = (SAttachedEventInfo){.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_PAUSE};
info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
taosArrayPush(streamTaskSMTrans, &trans);
// resume is completed by restore status of state-machine
return 0;
// stop related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// dropping related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
}