refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-02 17:10:12 +08:00
parent 29afc2ad6a
commit 2387d5bade
4 changed files with 15 additions and 5 deletions

View File

@ -792,6 +792,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
void streamTaskRestoreStatus(SStreamTask* pTask);

View File

@ -34,6 +34,8 @@ typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
typedef struct SAttachedEventInfo {
ETaskStatus status; // required status that this event can be handled
EStreamTaskEvent event; // the delayed handled event
void* pParam;
void* pFn;
} SAttachedEventInfo;
typedef struct STaskStateTrans {

View File

@ -356,9 +356,16 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
} else {
ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
int32_t code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
pStreamTask->id.idStr, tstrerror(code));
streamMetaReleaseTask(pMeta, pStreamTask);
return code;
} else {
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
}
}
// wait for the stream task to handle all in the inputQ, and to be idle

View File

@ -278,10 +278,10 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
ETaskStatus s = streamTaskGetStatus(pTask, NULL);
taosThreadMutexUnlock(&pTask->lock);
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already
stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event));
return TSDB_CODE_SUCCESS;
} else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already
} else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT) {
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event));
taosMsleep(100);
} else {