refactor(stream): add state machine to manage the state of stream tasks.

This commit is contained in:
Haojun Liao 2023-10-19 10:34:20 +08:00
parent 54ec679b58
commit f4caeca24a
12 changed files with 307 additions and 143 deletions

View File

@ -75,6 +75,7 @@ typedef enum ETaskStatus {
TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause TASK_STATUS__PAUSE, // pause
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS_STREAM_SCAN_HISTORY,
} ETaskStatus; } ETaskStatus;
enum { enum {
@ -129,17 +130,18 @@ enum {
typedef enum EStreamTaskEvent { typedef enum EStreamTaskEvent {
TASK_EVENT_INIT = 0x1, TASK_EVENT_INIT = 0x1,
TASK_EVENT_INIT_SCAN_HISTORY = 0x2, TASK_EVENT_INIT_SCANHIST = 0x2,
TASK_EVENT_SCANHIST_COMPLETED = 0x3, TASK_EVENT_INIT_STREAM_SCANHIST = 0x3,
TASK_EVENT_START = 0x4, TASK_EVENT_SCANHIST_DONE = 0x4,
TASK_EVENT_STOP = 0x5, TASK_EVENT_STOP = 0x5,
TASK_EVENT_GEN_CHECKPOINT = 0x6, TASK_EVENT_GEN_CHECKPOINT = 0x6,
TASK_EVENT_PAUSE = 0x7, TASK_EVENT_CHECKPOINT_DONE = 0x7,
TASK_EVENT_RESUME = 0x8, TASK_EVENT_PAUSE = 0x8,
TASK_EVENT_HALT = 0x9, TASK_EVENT_RESUME = 0x9,
TASK_EVENT_TRANS_STATE = 0xA, TASK_EVENT_HALT = 0xA,
TASK_EVENT_SCAN_TSDB = 0xB, TASK_EVENT_TRANS_STATE = 0xB,
TASK_EVENT_SCAN_WAL = 0xC, TASK_EVENT_SCAN_TSDB = 0xC,
TASK_EVENT_SCAN_WAL = 0xD,
} EStreamTaskEvent; } EStreamTaskEvent;
typedef struct { typedef struct {

View File

@ -1029,7 +1029,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
bool restored = pTq->pVnode->restored; bool restored = pTq->pVnode->restored;
if (p != NULL && restored && p->info.fillHistory == 0) { if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY; EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
streamTaskHandleEvent(p->status.pSM, event); streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) { } else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
@ -1259,7 +1259,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
#endif #endif
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
tqScanWalAsync(pTq, false); tqScanWalAsync(pTq, false);
} }
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
@ -1683,7 +1683,6 @@ FAIL:
return -1; return -1;
} }
// todo error code cannot be return, since this is invoked by an mnode-launched transaction.
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -1694,7 +1693,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// disable auto rsp to mnode // disable auto rsp to mnode
pRsp->info.handle = NULL; pRsp->info.handle = NULL;
// todo: add counter to make sure other tasks would not be trapped in checkpoint state
SStreamCheckpointSourceReq req = {0}; SStreamCheckpointSourceReq req = {0};
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId); tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
@ -1725,6 +1723,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
// todo handle failure to reset from checkpoint procedure
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
@ -1735,6 +1734,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo handle failure to reset from checkpoint procedure
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. // downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
if (pTask->status.downstreamReady != 1) { if (pTask->status.downstreamReady != 1) {
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
@ -1750,8 +1750,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo save the checkpoint failed info
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
if (pTask->status.taskStatus == TASK_STATUS__HALT) { if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT) {
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
pTask->id.idStr, req.checkpointId); pTask->id.idStr, req.checkpointId);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);

View File

@ -99,7 +99,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
continue; continue;
} }
EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY; EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
streamTaskHandleEvent(pTask->status.pSM, event); streamTaskHandleEvent(pTask->status.pSM, event);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }

View File

@ -31,14 +31,19 @@ typedef struct SStreamTaskState {
typedef int32_t (*__state_trans_fn)(SStreamTask*); typedef int32_t (*__state_trans_fn)(SStreamTask*);
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*); typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
typedef struct STaskStateTrans { typedef struct SAttachedEventInfo {
bool autoInvokeEndFn; ETaskStatus status;
SStreamTaskState state;
EStreamTaskEvent event; EStreamTaskEvent event;
SStreamTaskState next; } SAttachedEventInfo;
__state_trans_fn preAction;
__state_trans_fn pAction; typedef struct STaskStateTrans {
bool autoInvokeEndFn;
SStreamTaskState state;
EStreamTaskEvent event;
SStreamTaskState next;
__state_trans_fn pAction;
__state_trans_succ_fn pSuccAction; __state_trans_succ_fn pSuccAction;
SAttachedEventInfo attachEvent;
} STaskStateTrans; } STaskStateTrans;
struct SStreamTaskSM { struct SStreamTaskSM {
@ -48,6 +53,8 @@ struct SStreamTaskSM {
int64_t startTs; int64_t startTs;
SStreamTaskState current; SStreamTaskState current;
SStreamTaskState prev; SStreamTaskState prev;
// register the next handled event, if current state is not allowed to handle this event
SArray* eventList;
}; };
typedef struct SStreamEventInfo { typedef struct SStreamEventInfo {
@ -57,7 +64,7 @@ typedef struct SStreamEventInfo {
} SStreamEventInfo; } SStreamEventInfo;
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -69,7 +69,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
return; return;
} }
if (pTask->status.taskStatus == TASK_STATUS__CK) { if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
} else { } else {
if (status == TASK_TRIGGER_STATUS__ACTIVE) { if (status == TASK_TRIGGER_STATUS__ACTIVE) {
@ -267,8 +267,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
} }
// disable the data from upstream tasks // disable the data from upstream tasks
int8_t st = pTask->status.taskStatus; if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT) {
if (st == TASK_STATUS__HALT) {
status = TASK_INPUT_STATUS__BLOCKED; status = TASK_INPUT_STATUS__BLOCKED;
} }

View File

@ -135,12 +135,13 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
// todo this status may not be set here.
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ. // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
pTask->status.taskStatus = TASK_STATUS__CK; streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
pTask->checkpointingId = pReq->checkpointId; pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1; pTask->execInfo.checkpoint += 1;
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
@ -171,11 +172,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
// set the task status
pTask->checkpointingId = checkpointId;
// set task status // set task status
pTask->status.taskStatus = TASK_STATUS__CK; if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
pTask->checkpointingId = checkpointId;
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
}
{ // todo: remove this when the pipeline checkpoint generating is used. { // todo: remove this when the pipeline checkpoint generating is used.
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
@ -195,6 +196,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointBlock(pBlock, pTask); continueDispatchCheckpointBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info } else { // only one task exists, no need to dispatch downstream info
atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1);
streamProcessCheckpointReadyMsg(pTask); streamProcessCheckpointReadyMsg(pTask);
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
} }
@ -288,12 +290,14 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
continue; continue;
} }
int8_t prev = p->status.taskStatus;
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
p->chkInfo.checkpointId = p->checkpointingId; p->chkInfo.checkpointId = p->checkpointingId;
streamTaskClearCheckInfo(p); streamTaskClearCheckInfo(p);
streamSetStatusNormal(p);
char* str = NULL;
streamTaskGetStatus(p, &str);
streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
// save the task // save the task
streamMetaSaveTask(pMeta, p); streamMetaSaveTask(pMeta, p);
@ -302,13 +306,13 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, " "vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", "checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
streamGetTaskStatusStr(prev)); str);
} }
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId, stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
checkpointId, terrstr()); checkpointId, terrstr());
return -1; return -1;
} else { } else {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);

View File

@ -1011,7 +1011,7 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->pRspMsgList); int32_t num = taosArrayGetSize(pTask->pRspMsgList);
stDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId, stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
num); num);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1027,7 +1027,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
tmsgSendRsp(&pInfo->msg); tmsgSendRsp(&pInfo->msg);
stDebug("s-task:%s level:%d notify upstream:0x%x continuing scan data in WAL", id, level, pInfo->taskId); stDebug("s-task:%s level:%d notify upstream:0x%x continuing handle data in WAL", id, level, pInfo->taskId);
} }
taosArrayClear(pTask->pRspMsgList); taosArrayClear(pTask->pRspMsgList);

View File

@ -324,7 +324,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
} else { } else {
ASSERT(status == TASK_STATUS__NORMAL); ASSERT(status == TASK_STATUS__NORMAL);
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
// pStreamTask->status.taskStatus = TASK_STATUS__HALT;
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
} }
@ -375,7 +374,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 7. pause allowed. // 7. pause allowed.
streamTaskEnablePause(pStreamTask); streamTaskEnablePause(pStreamTask);
if (taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) { if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
@ -492,7 +491,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock); code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
if (code == 0) { if (code == 0) {
streamDispatchStreamBlock(pTask); streamDispatchStreamBlock(pTask);
} else { } else { // todo put into queue failed, retry
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
} }
} else { // level == TASK_LEVEL__SINK } else { // level == TASK_LEVEL__SINK

View File

@ -894,7 +894,7 @@ void metaHbToMnode(void* param, void* tmrId) {
STaskStatusEntry entry = { STaskStatusEntry entry = {
.id = *pId, .id = *pId,
.status = (*pTask)->status.taskStatus, .status = streamTaskGetStatus(*pTask, NULL),
.nodeId = pMeta->vgId, .nodeId = pMeta->vgId,
.stage = pMeta->stage, .stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)), .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),

View File

@ -272,8 +272,12 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
ASSERT(pTask->hTaskInfo.id.taskId == 0); ASSERT(pTask->hTaskInfo.id.taskId == 0);
} }
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader)); stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64,
id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader));
} else {
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus);
}
streamTaskEnablePause(pTask); streamTaskEnablePause(pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -520,11 +524,13 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
int32_t taskLevel = pTask->info.taskLevel; int32_t taskLevel = pTask->info.taskLevel;
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
char* p = NULL; const char* id = pTask->id.idStr;
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status != TASK_STATUS__SCAN_HISTORY) { if (status != TASK_STATUS__SCAN_HISTORY) {
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly",
pTask->id.idStr, p, pReq->upstreamTaskId); id, p, pReq->upstreamTaskId);
void* pBuf = NULL; void* pBuf = NULL;
int32_t len = 0; int32_t len = 0;
@ -534,8 +540,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
tmsgSendRsp(&msg); tmsgSendRsp(&msg);
stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", pTask->id.idStr, stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id,
pTask->info.taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId); taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId);
return 0; return 0;
} }
@ -547,13 +553,15 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
if (left == 0) { if (left == 0) {
int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList); int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug( if (taskLevel == TASK_LEVEL__AGG) {
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send " stDebug(
"rsp to all upstream tasks", "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data processing "
pTask->id.idStr, numOfTasks); "and send rsp to all upstream tasks",
id, numOfTasks);
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamAggUpstreamScanHistoryFinish(pTask); streamAggUpstreamScanHistoryFinish(pTask);
} else {
stDebug("s-task:%s all %d upstream task(s) finish scan-history data, and rsp to all upstream tasks", id,
numOfTasks);
} }
// all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data // all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data
@ -564,14 +572,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
if (taskLevel == TASK_LEVEL__AGG) { if (taskLevel == TASK_LEVEL__AGG) {
/*int32_t code = */streamTaskScanHistoryDataComplete(pTask); /*int32_t code = */streamTaskScanHistoryDataComplete(pTask);
} else { // for sink task, set normal } else { // for sink task, set normal
if (pTask->status.taskStatus != TASK_STATUS__PAUSE && pTask->status.taskStatus != TASK_STATUS__STOP && streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
pTask->status.taskStatus != TASK_STATUS__DROPPING) {
streamSetStatusNormal(pTask);
}
} }
} else { } else {
stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left); id, pReq->upstreamTaskId, pReq->childId, left);
} }
return 0; return 0;
@ -582,7 +587,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ // execute in the scan history complete call back msg, ready to process data from inputQ
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
streamTaskSetSchedStatusInactive(pTask); streamTaskSetSchedStatusInactive(pTask);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
@ -619,7 +624,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
} }
// check if downstream tasks have been ready // check if downstream tasks have been ready
streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCAN_HISTORY); streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST);
} }
static void tryLaunchHistoryTask(void* param, void* tmrId) { static void tryLaunchHistoryTask(void* param, void* tmrId) {
@ -903,7 +908,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange; SDataRange* pRange = &pTask->dataRange;
if (pTask->hTaskInfo.id.taskId == 0) { if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64, stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
@ -949,7 +954,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause // normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
#if 0
int8_t status = pTask->status.taskStatus; int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) { if (status == TASK_STATUS__DROPPING) {
stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
@ -1007,6 +1012,13 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
#endif
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
// in case of fill-history task, stop the tsdb file scan operation. // in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
@ -1019,6 +1031,21 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
} }
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status == TASK_STATUS__PAUSE) {
streamTaskRestoreStatus(pTask);
streamTaskGetStatus(pTask, &p);
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr,
p, num);
} else {
stDebug("s-task:%s status:%s not in pause status, no need to resume", pTask->id.idStr, p);
}
#if 0
int8_t status = pTask->status.taskStatus; int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__PAUSE) { if (status == TASK_STATUS__PAUSE) {
pTask->status.taskStatus = pTask->status.keepTaskStatus; pTask->status.taskStatus = pTask->status.keepTaskStatus;
@ -1031,6 +1058,8 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
} else { } else {
stError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); stError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
} }
#endif
} }
// todo fix race condition // todo fix race condition

View File

@ -304,11 +304,9 @@ static void freeUpstreamItem(void* p) {
void tFreeStreamTask(SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) {
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
char* p = NULL;
streamTaskGetStatus(pTask, &p);
STaskExecStatisInfo* pStatis = &pTask->execInfo; STaskExecStatisInfo* pStatis = &pTask->execInfo;
stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState, p); stDebug("start to free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState);
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
@ -394,6 +392,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pRspMsgList = NULL; pTask->pRspMsgList = NULL;
} }
pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM);
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo); streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList); pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);

View File

@ -14,39 +14,63 @@
*/ */
#include "streamInt.h" #include "streamInt.h"
#include "streamsm.h"
#include "tmisce.h" #include "tmisce.h"
#include "tstream.h" #include "tstream.h"
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
#include "streamsm.h"
SStreamTaskState StreamTaskStatusList[8] = { SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__NORMAL, .name = "normal"}, {.state = TASK_STATUS__NORMAL, .name = "normal"},
{.state = TASK_STATUS__DROPPING, .name = "dropping"}, {.state = TASK_STATUS__DROPPING, .name = "dropped"},
{.state = TASK_STATUS__UNINIT, .name = "uninit"}, {.state = TASK_STATUS__UNINIT, .name = "uninit"},
{.state = TASK_STATUS__STOP, .name = "stop"}, {.state = TASK_STATUS__STOP, .name = "stop"},
{.state = TASK_STATUS__SCAN_HISTORY, .name = "scan-history"}, {.state = TASK_STATUS__SCAN_HISTORY, .name = "scan-history"},
{.state = TASK_STATUS__HALT, .name = "halt"}, {.state = TASK_STATUS__HALT, .name = "halt"},
{.state = TASK_STATUS__PAUSE, .name = "paused"}, {.state = TASK_STATUS__PAUSE, .name = "paused"},
{.state = TASK_STATUS__CK, .name = "checkpoint"}, {.state = TASK_STATUS__CK, .name = "checkpoint"},
{.state = TASK_STATUS_STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
}; };
SStreamEventInfo StreamTaskEventList[8] = { SStreamEventInfo StreamTaskEventList[10] = {
{}, // dummy event, place holder {}, // dummy event, place holder
{.event = TASK_EVENT_INIT, .name = "initialize"}, {.event = TASK_EVENT_INIT, .name = "initialize"},
{.event = TASK_EVENT_INIT_SCAN_HISTORY, .name = "scan-history-initialize"}, {.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-initialize"},
{.event = TASK_EVENT_SCANHIST_COMPLETED, .name = "scan-history-completed"}, {.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"},
{.event = TASK_EVENT_STOP, .name = "stopping"},
{.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"},
{.event = TASK_EVENT_CHECKPOINT_DONE, .name = "checkpoint-done"},
{.event = TASK_EVENT_PAUSE, .name = "pausing"},
{.event = TASK_EVENT_RESUME, .name = "resuming"},
{.event = TASK_EVENT_HALT, .name = "halting"},
}; };
static int32_t initStateTransferTable(SStreamTaskSM* pSM);
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
__state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn,
bool autoInvoke);
static int32_t streamTaskInitStatus(SStreamTask* pTask); static int32_t streamTaskInitStatus(SStreamTask* pTask);
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask); static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
static int32_t initStateTransferTable(SStreamTaskSM* pSM);
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
__state_trans_fn fn, __state_trans_succ_fn succFn,
SAttachedEventInfo* pEventInfo, bool autoInvoke);
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; } static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name);
SStreamTaskSM* pSM = pTask->status.pSM;
if (pSM->eventList == NULL) {
}
taosArrayPush(pSM->eventList, pEvtInfo);
return 0;
}
int32_t streamTaskInitStatus(SStreamTask* pTask) { int32_t streamTaskInitStatus(SStreamTask* pTask) {
pTask->execInfo.init = taosGetTimestampMs(); pTask->execInfo.init = taosGetTimestampMs();
@ -57,49 +81,36 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
} }
int32_t streamTaskSetReadyForWal(SStreamTask* pTask) { int32_t streamTaskSetReadyForWal(SStreamTask* pTask) {
stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamSetStatusNormal(pTask); // todo remove it stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr);
}
streamSetStatusNormal(pTask); // todo remove it
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t streamTaskDoPause(SStreamTask* pTask) {
stDebug("s-task:%s start to pause tasks", pTask->id.idStr);
return 0;
}
static int32_t streamTaskDoResume(SStreamTask* pTask) {
stDebug("s-task:%s start to resume tasks", pTask->id.idStr);
return 0;
}
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) { static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr); stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
return 0; return 0;
} }
int32_t streamTaskWaitBeforeHalt(SStreamTask* pTask) {
char* p = NULL;
while (streamTaskGetStatus(pTask, &p) != TASK_STATUS__NORMAL) {
stDebug("related stream task:%s(status:%s) not ready for halt, wait for 100ms and retry", pTask->id.idStr, p);
taosMsleep(100);
}
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->hTaskInfo.haltVer == -1) { pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1; if (pTask->hTaskInfo.haltVer == -1) {
pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo optimize the perf of find the trans objs by using hash table // todo optimize the perf of find the trans objs by using hash table
static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) { static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) {
int32_t numOfTrans = taosArrayGetSize(pState->pTransList); int32_t numOfTrans = taosArrayGetSize(pState->pTransList);
for(int32_t i = 0; i < numOfTrans; ++i) { for (int32_t i = 0; i < numOfTrans; ++i) {
STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i); STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i);
if (pTrans->state.state == pState->current.state && pTrans->event == event) { if (pTrans->state.state == pState->current.state && pTrans->event == event) {
return pTrans; return pTrans;
@ -115,6 +126,8 @@ void streamTaskRestoreStatus(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ASSERT(pSM->pActiveTrans == NULL); ASSERT(pSM->pActiveTrans == NULL);
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
SStreamTaskState state = pSM->current; SStreamTaskState state = pSM->current;
pSM->current = pSM->prev; pSM->current = pSM->prev;
pSM->prev = state; pSM->prev = state;
@ -125,46 +138,89 @@ void streamTaskRestoreStatus(SStreamTask* pTask) {
} }
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM)); SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM));
if (pSM == NULL) { if (pSM == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
tstrerror(terrno));
return NULL; return NULL;
} }
pSM->pTask = pTask; pSM->pTask = pTask;
pSM->eventList = taosArrayInit(4, sizeof(SAttachedEventInfo));
if (pSM->eventList == NULL) {
taosMemoryFree(pSM);
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
tstrerror(terrno));
return NULL;
}
// set the initial state for the state-machine of stream task // set the initial state for the state-machine of stream task
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->startTs = taosGetTimestampMs(); pSM->startTs = taosGetTimestampMs();
int32_t code = initStateTransferTable(pSM); int32_t code = initStateTransferTable(pSM);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pSM->eventList);
taosMemoryFree(pSM); taosMemoryFree(pSM);
return NULL; return NULL;
} }
return pSM; return pSM;
} }
void* streamDestroyStateMachine(SStreamTaskSM* pSM) {
if (pSM == NULL) {
return NULL;
}
taosArrayDestroy(pSM->eventList);
taosArrayDestroy(pSM->pTransList);
taosMemoryFree(pSM);
return NULL;
}
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
taosThreadMutexLock(&pSM->pTask->lock);
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event); STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event);
stDebug("s-task:%s start to handle event:%s, state:%s", pSM->pTask->id.idStr, StreamTaskEventList[event].name, stDebug("s-task:%s start to handle event:%s, state:%s", pSM->pTask->id.idStr, StreamTaskEventList[event].name,
pSM->current.name); pSM->current.name);
int32_t code = pTrans->preAction(pSM->pTask); if (pTrans->attachEvent.event != 0) {
attachEvent(pSM->pTask, &pTrans->attachEvent);
taosThreadMutexUnlock(&pSM->pTask->lock);
taosThreadMutexLock(&pSM->pTask->lock); while (1) {
ASSERT(pSM->pActiveTrans == NULL); // wait for the task to be here
pSM->pActiveTrans = pTrans; ETaskStatus s = streamTaskGetStatus(pSM->pTask, NULL);
pSM->startTs = taosGetTimestampMs(); if (s == pTrans->attachEvent.status) {
taosThreadMutexUnlock(&pSM->pTask->lock); return TSDB_CODE_SUCCESS;
} else {// this event has been handled already
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pSM->pTask->id.idStr,
StreamTaskEventList[event].name);
taosMsleep(100);
}
}
code = pTrans->pAction(pSM->pTask); } else {
// todo handle error code; ASSERT(pSM->pActiveTrans == NULL);
pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pSM->pTask->lock);
if (pTrans->autoInvokeEndFn) { int32_t code = pTrans->pAction(pSM->pTask);
streamTaskOnHandleEventSuccess(pSM); // todo handle error code;
if (pTrans->autoInvokeEndFn) {
streamTaskOnHandleEventSuccess(pSM);
}
} }
return code; return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
@ -181,11 +237,42 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
// on success callback, add into lock if necessary, or maybe we should add an option for this? // on success callback, add into lock if necessary, or maybe we should add an option for this?
pTrans->pSuccAction(pTask); pTrans->pSuccAction(pTask);
taosThreadMutexUnlock(&pTask->lock);
int64_t el = (taosGetTimestampMs() - pSM->startTs); if (taosArrayGetSize(pSM->eventList) > 0) {
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, int64_t el = (taosGetTimestampMs() - pSM->startTs);
StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name); stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name);
SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->eventList);
// OK, let's handle the attached event, since the task has reached the required status now
if (pSM->current.state == pEvtInfo->status) {
stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr,
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL);
pSM->pActiveTrans = pNextTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
int32_t code = pNextTrans->pAction(pSM->pTask);
if (pTrans->autoInvokeEndFn) {
return streamTaskOnHandleEventSuccess(pSM);
} else {
return code;
}
}
} else {
taosThreadMutexUnlock(&pTask->lock);
int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -197,17 +284,19 @@ ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr) {
return s.state; return s.state;
} }
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
__state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn, __state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) {
bool autoInvoke) {
STaskStateTrans trans = {0}; STaskStateTrans trans = {0};
trans.state = StreamTaskStatusList[current]; trans.state = StreamTaskStatusList[current];
trans.next = StreamTaskStatusList[next]; trans.next = StreamTaskStatusList[next];
trans.event = event; trans.event = event;
trans.preAction = (preFn != NULL)? preFn:dummyFn; if (pEventInfo != NULL) {
trans.pAction = (fn != NULL)? fn : dummyFn; trans.attachEvent = *pEventInfo;
trans.pSuccAction = (succFn != NULL)? succFn:dummyFn; }
trans.pAction = (fn != NULL) ? fn : dummyFn;
trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn;
trans.autoInvokeEndFn = autoInvoke; trans.autoInvokeEndFn = autoInvoke;
return trans; return trans;
} }
@ -221,49 +310,83 @@ int32_t initStateTransferTable(SStreamTaskSM* pSM) {
} }
// initialization event handle // initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, NULL, STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT,
streamTaskInitStatus, onNormalTaskReady, false); streamTaskInitStatus, onNormalTaskReady, false, false);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCAN_HISTORY, NULL, trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false); streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_COMPLETED, NULL, trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS_STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST,
streamTaskSetReadyForWal, NULL, true); streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
// pause & resume related event handle trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE,
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, streamTaskDoPause, NULL, streamTaskSetReadyForWal, NULL, NULL, true);
true);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__NORMAL, TASK_EVENT_RESUME, NULL, streamTaskDoResume, trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE,
NULL, true); streamTaskSetReadyForWal, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
// halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
SAttachedEventInfo info = {.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal,
&info, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
// checkpoint related event
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL,
streamTaskDoCheckpoint, NULL, true); streamTaskDoCheckpoint, NULL, true);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); trans =
createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
// halt stream task, from other task status // pause & resume related event handle
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL, trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
streamTaskKeepCurrentVerInWal, true);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt, trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
NULL, streamTaskKeepCurrentVerInWal, true);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt, NULL, info = (SAttachedEventInfo){.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_PAUSE};
streamTaskKeepCurrentVerInWal, true); trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL, trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
streamTaskKeepCurrentVerInWal, true);
taosArrayPush(pSM->pTransList, &trans); taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
// resume is completed by restore status of state-machine
return 0; return 0;
} }