fix(stream): set the concurrently handle init failed.
This commit is contained in:
parent
895a9a1f3d
commit
c1ca6ce464
|
@ -1379,8 +1379,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
||||||
code = ret;
|
code = ret;
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
|
@ -1486,6 +1489,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the start all tasks procedure may happen to start the newly deployed stream task, and results in the
|
||||||
|
// concurrently start this task by two threads.
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
SStreamTaskState status = streamTaskGetStatus(pTask);
|
SStreamTaskState status = streamTaskGetStatus(pTask);
|
||||||
if (status.state != TASK_STATUS__UNINIT) {
|
if (status.state != TASK_STATUS__UNINIT) {
|
||||||
|
@ -1516,14 +1521,19 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// concurrently start task may cause the later started task be failed, and also failed to added into meta result.
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
|
stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
|
|
||||||
|
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
|
||||||
|
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return code;
|
return code;
|
||||||
|
@ -1584,6 +1594,14 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||||
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||||
if (code) {
|
if (code) {
|
||||||
|
if (code == TSDB_CODE_DUP_KEY) {
|
||||||
|
stError("vgId:%d record start task result failed, s-task:0x%x already exist start results in meta dst hashmap",
|
||||||
|
pMeta->vgId, id.taskId);
|
||||||
|
} else {
|
||||||
|
stError("vgId:%d failed to record start task:0x%x results, start all tasks failed", pMeta->vgId, id.taskId);
|
||||||
|
}
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
|
|
@ -405,7 +405,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
EStreamTaskEvent evt = pSM->pActiveTrans->event;
|
EStreamTaskEvent evt = pSM->pActiveTrans->event;
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
|
stDebug("s-task:%s status:%s handling event:%s by another thread, wait for 100ms and check if completed",
|
||||||
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
|
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
} else {
|
} else {
|
||||||
|
@ -418,6 +418,13 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSM->pActiveTrans != NULL) {
|
if (pSM->pActiveTrans != NULL) {
|
||||||
|
// not allowed concurrently initialization
|
||||||
|
if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) {
|
||||||
|
stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr);
|
||||||
|
code = TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
||||||
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
||||||
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
||||||
|
|
Loading…
Reference in New Issue