diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 587e762448..17b87f2355 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -803,7 +803,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); -void streamTaskRestoreStatus(SStreamTask* pTask); +int32_t streamTaskRestoreStatus(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f173c327c7..3a72146a41 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -87,6 +87,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; + pChkInfo->processedVer = pChkInfo->checkpointVer; sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5ac44cbbae..81d9a9f13f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -835,6 +835,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { // checkpoint ver is the kept version, handled data should be the next version. if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; + pChkInfo->processedVer = pChkInfo->checkpointVer; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5c03aced32..aab2e6ab95 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -636,8 +636,6 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId, pDispatcher->taskId, nodeId, buf); } - } else { - // do nothing } } @@ -869,20 +867,15 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { void streamTaskResume(SStreamTask* pTask) { SStreamTaskState prevState = *streamTaskGetStatus(pTask); + SStreamMeta* pMeta = pTask->pMeta; - - if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) { - streamTaskRestoreStatus(pTask); - - char* pNew = streamTaskGetStatus(pTask)->name; - if (prevState.state == TASK_STATUS__PAUSE) { - int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num); - } else { - stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name); - } + int32_t code = streamTaskRestoreStatus(pTask); + if (code == TSDB_CODE_SUCCESS) { + char* pNew = streamTaskGetStatus(pTask)->name; + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); + stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num); } else { - stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name); + stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name, pMeta->numOfPausedTasks); } } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 9ca5248157..6aa215586a 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -196,30 +196,61 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, return code; } -void streamTaskRestoreStatus(SStreamTask* pTask) { +static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) { SStreamTaskSM* pSM = pTask->status.pSM; + bool removed = false; taosThreadMutexLock(&pTask->lock); - ASSERT(pSM->pActiveTrans == NULL); - ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT); + int32_t num = taosArrayGetSize(pSM->pWaitingEventList); + for (int32_t i = 0; i < num; ++i) { + SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i); + if (pInfo->event == event) { + taosArrayRemove(pSM->pWaitingEventList, i); + stDebug("s-task:%s pause event in waiting list not be handled yet, remove it from waiting list, remaining:%d", + pTask->id.idStr, pInfo->event); + removed = true; + break; + } + } - SStreamTaskState state = pSM->current; - pSM->current = pSM->prev.state; - - pSM->prev.state = state; - pSM->prev.evt = 0; - - pSM->startTs = taosGetTimestampMs(); - - if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { - stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); - doHandleWaitingEvent(pSM, "restore-pause/halt", pTask); - } else { - stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); + if (!removed) { + stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name); } taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskRestoreStatus(SStreamTask* pTask) { + SStreamTaskSM* pSM = pTask->status.pSM; + int32_t code = 0; + + taosThreadMutexLock(&pTask->lock); + + if (pSM->current.state == TASK_STATUS__PAUSE && pSM->pActiveTrans == NULL) { + SStreamTaskState state = pSM->current; + pSM->current = pSM->prev.state; + + pSM->prev.state = state; + pSM->prev.evt = 0; + + pSM->startTs = taosGetTimestampMs(); + + if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { + stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, + pSM->prev.state.name, pSM->current.name); + doHandleWaitingEvent(pSM, "restore-pause/halt", pTask); + } else { + stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); + } + } else { + removeEventInWaitingList(pTask, TASK_EVENT_PAUSE); + code = -1; // failed to restore the status + } + + taosThreadMutexUnlock(&pTask->lock); + return code; } SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {