fix(stream): handle pause event in waiting list.

This commit is contained in:
Haojun Liao 2024-02-19 15:57:21 +08:00
parent c5b26b406f
commit bf242a4f64
5 changed files with 57 additions and 31 deletions

View File

@ -803,7 +803,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
void streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,

View File

@ -87,6 +87,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}

View File

@ -835,6 +835,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}

View File

@ -636,8 +636,6 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
pDispatcher->taskId, nodeId, buf);
}
} else {
// do nothing
}
}
@ -869,20 +867,15 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
void streamTaskResume(SStreamTask* pTask) {
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
SStreamMeta* pMeta = pTask->pMeta;
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
streamTaskRestoreStatus(pTask);
char* pNew = streamTaskGetStatus(pTask)->name;
if (prevState.state == TASK_STATUS__PAUSE) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
} else {
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name);
}
int32_t code = streamTaskRestoreStatus(pTask);
if (code == TSDB_CODE_SUCCESS) {
char* pNew = streamTaskGetStatus(pTask)->name;
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
} else {
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name);
stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name, pMeta->numOfPausedTasks);
}
}

View File

@ -196,30 +196,61 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
return code;
}
void streamTaskRestoreStatus(SStreamTask* pTask) {
static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) {
SStreamTaskSM* pSM = pTask->status.pSM;
bool removed = false;
taosThreadMutexLock(&pTask->lock);
ASSERT(pSM->pActiveTrans == NULL);
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
for (int32_t i = 0; i < num; ++i) {
SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
if (pInfo->event == event) {
taosArrayRemove(pSM->pWaitingEventList, i);
stDebug("s-task:%s pause event in waiting list not be handled yet, remove it from waiting list, remaining:%d",
pTask->id.idStr, pInfo->event);
removed = true;
break;
}
}
SStreamTaskState state = pSM->current;
pSM->current = pSM->prev.state;
pSM->prev.state = state;
pSM->prev.evt = 0;
pSM->startTs = taosGetTimestampMs();
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
} else {
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
if (!removed) {
stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name);
}
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
int32_t code = 0;
taosThreadMutexLock(&pTask->lock);
if (pSM->current.state == TASK_STATUS__PAUSE && pSM->pActiveTrans == NULL) {
SStreamTaskState state = pSM->current;
pSM->current = pSM->prev.state;
pSM->prev.state = state;
pSM->prev.evt = 0;
pSM->startTs = taosGetTimestampMs();
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr,
pSM->prev.state.name, pSM->current.name);
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
} else {
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
}
} else {
removeEventInWaitingList(pTask, TASK_EVENT_PAUSE);
code = -1; // failed to restore the status
}
taosThreadMutexUnlock(&pTask->lock);
return code;
}
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {