fix(stream): fix race condition when state transferring.

This commit is contained in:
Haojun Liao 2023-10-29 02:27:27 +08:00
parent 1a1b9f5ea3
commit 076b533ba3
1 changed files with 40 additions and 30 deletions

View File

@ -106,16 +106,16 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
}
// todo optimize the perf of find the trans objs by using hash table
static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) {
static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) {
int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
for (int32_t i = 0; i < numOfTrans; ++i) {
STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i);
if (pTrans->state.state == pState->current.state && pTrans->event == event) {
if (pTrans->state.state == state && pTrans->event == event) {
return pTrans;
}
}
if (event == TASK_EVENT_CHECKPOINT_DONE && pState->current.state == TASK_STATUS__STOP) {
if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) {
} else {
ASSERT(0);
@ -181,21 +181,9 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) {
return NULL;
}
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) {
SStreamTask* pTask = pSM->pTask;
taosThreadMutexLock(&pTask->lock);
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event);
if (pTrans == NULL) {
stWarn("s-task:%s status:%s not allowed handle event:%s", pTask->id.idStr, pSM->current.name, StreamTaskEventList[event].name);
taosThreadMutexUnlock(&pTask->lock);
return -1;
} else {
stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name,
pSM->current.name);
}
if (pTrans->attachEvent.event != 0) {
attachEvent(pTask, &pTrans->attachEvent);
taosThreadMutexUnlock(&pTask->lock);
@ -216,13 +204,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
}
}
} else {
if (pSM->pActiveTrans != NULL) {
ASSERT(!pSM->pActiveTrans->autoInvokeEndFn);
stWarn("s-task:%s status:%s handle event:%s is interrupted, handle the new event:%s", pTask->id.idStr,
pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name, StreamTaskEventList[event].name);
}
} else { // override current active trans
pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
@ -238,6 +220,34 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
SStreamTask* pTask = pSM->pTask;
STaskStateTrans* pTrans = NULL;
while (1) {
taosThreadMutexLock(&pTask->lock);
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
taosThreadMutexUnlock(&pTask->lock);
taosMsleep(100);
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name);
} else {
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pSM->pActiveTrans != NULL) {
// currently in some state transfer procedure, not auto invoke transfer, abort it
stDebug("s-task:%s handle event %s quit, status %s -> %s failed, handle event %s now", pTask->id.idStr,
StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, pSM->pActiveTrans->next.name,
StreamTaskEventList[event].name);
}
doHandleEvent(pSM, event, pTrans);
break;
}
}
return TSDB_CODE_SUCCESS;
}
static void keepPrevInfo(SStreamTaskSM* pSM) {
STaskStateTrans* pTrans = pSM->pActiveTrans;
@ -283,7 +293,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr,
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
pSM->pActiveTrans = pNextTrans;
@ -395,12 +405,12 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE,
streamTaskSetReadyForWal, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL,
NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE,
streamTaskSetReadyForWal, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL,
NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// halt stream task, from other task status