From 7e866c55278f86b892f50a906f86734ad992495d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Feb 2024 11:32:04 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 7 -- source/libs/stream/src/streamMeta.c | 9 +- source/libs/stream/src/streamTaskSm.c | 113 ++++++++++---------------- 3 files changed, 50 insertions(+), 79 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 940a8e0c49..5ac44cbbae 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1039,14 +1039,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - - // code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq); -// if (code == TSDB_CODE_SUCCESS) { -// doStartFillhistoryStep2(pTask, pStreamTask, pTq); -// } else { -// tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr); -// } streamMetaReleaseTask(pMeta, pStreamTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index db74ce9897..774a32b265 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -669,6 +669,13 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* i } } +static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + streamTaskSendCheckpointSourceRsp(pTask); + } + return 0; +} + int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; @@ -687,7 +694,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } // handle the dropping event - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING); + streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); streamMetaWUnLock(pMeta); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 593c2c5754..9ca5248157 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -59,15 +59,14 @@ static int32_t streamTaskInitStatus(SStreamTask* pTask); static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask); static int32_t initStateTransferTable(); static void doInitStateTransferTable(void); -static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask); static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, __state_trans_succ_fn succFn, - SFutureHandleEventInfo* pEventInfo, bool autoInvoke); + SFutureHandleEventInfo* pEventInfo); static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; } -static int32_t attachNextHandledEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) { +static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) { char* p = streamTaskGetStatus(pTask)->name; stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p, @@ -89,13 +88,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) { return 0; } -int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - streamTaskSendCheckpointSourceRsp(pTask); - } - return 0; -} - int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) { stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr); @@ -274,7 +266,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt const char* id = pTask->id.idStr; if (pTrans->attachEvent.event != 0) { - attachNextHandledEvent(pTask, &pTrans->attachEvent); + attachWaitedEvent(pTask, &pTrans->attachEvent); taosThreadMutexUnlock(&pTask->lock); while (1) { @@ -313,34 +305,13 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) { SStreamTask* pTask = pSM->pTask; - const char* id = pTask->id.idStr; - if (pTrans->attachEvent.event != 0) { SFutureHandleEventInfo info = pTrans->attachEvent; info.pParam = param; info.callBackFn = callbackFn; - attachNextHandledEvent(pTask, &info); + attachWaitedEvent(pTask, &info); taosThreadMutexUnlock(&pTask->lock); - -// while (1) { -// // wait for the task to be here -// taosThreadMutexLock(&pTask->lock); -// ETaskStatus s = streamTaskGetStatus(pTask)->state; -// taosThreadMutexUnlock(&pTask->lock); -// -// if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already -// stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event)); -// return TSDB_CODE_SUCCESS; -// } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT) { -// stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event)); -// taosMsleep(100); -// } else { -// stDebug("s-task:%s is dropped or stopped already, not wait.", id); -// return TSDB_CODE_STREAM_INVALID_STATETRANS; -// } -// } - } else { // override current active trans pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); @@ -415,13 +386,13 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ // no active event trans exists, handle this event directly pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { - stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); + stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } if (pSM->pActiveTrans != NULL) { - // currently in some state transfer procedure, not auto invoke transfer, quit from this prcedure + // currently in some state transfer procedure, not auto invoke transfer, quit from this procedure stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name, pSM->pActiveTrans->next.name, GET_EVT_NAME(event)); @@ -555,7 +526,7 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { } STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, - __state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo, bool autoInvoke) { + __state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo) { STaskStateTrans trans = {0}; trans.state = StreamTaskStatusList[current]; trans.next = StreamTaskStatusList[next]; @@ -570,7 +541,7 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr trans.pAction = (fn != NULL) ? fn : dummyFn; trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn; - trans.autoInvokeEndFn = autoInvoke; + trans.autoInvokeEndFn = (fn == NULL); return trans; } @@ -584,93 +555,93 @@ void doInitStateTransferTable(void) { streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans)); // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false); + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, NULL); taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); // halt stream task, from other task status - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL); taosArrayPush(streamTaskSMTrans, &trans); SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL); taosArrayPush(streamTaskSMTrans, &trans); // checkpoint related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); // pause & resume related event handle - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); // resume is completed by restore status of state-machine // stop related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); // dropping related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); } //clang-format on \ No newline at end of file