diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8ecd62d1eb..670f692c59 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1379,7 +1379,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } } streamMetaReleaseTask(pMeta, pTask); @@ -1486,6 +1489,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas return TSDB_CODE_SUCCESS; } + // the start all tasks procedure may happen to start the newly deployed stream task, and results in the + // concurrently start this task by two threads. streamMutexLock(&pTask->lock); SStreamTaskState status = streamTaskGetStatus(pTask); if (status.state != TASK_STATUS__UNINIT) { @@ -1516,12 +1521,17 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas streamMutexUnlock(&pTask->lock); } + // concurrently start task may cause the later started task be failed, and also failed to added into meta result. if (code == TSDB_CODE_SUCCESS) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, tstrerror(code)); - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + + // do no added into result hashmap if it is failed due to concurrently starting of this stream task. + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } } } @@ -1584,6 +1594,14 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); if (code) { + if (code == TSDB_CODE_DUP_KEY) { + stError("vgId:%d record start task result failed, s-task:0x%x already exist start results in meta dst hashmap", + pMeta->vgId, id.taskId); + } else { + stError("vgId:%d failed to record start task:0x%x results, start all tasks failed", pMeta->vgId, id.taskId); + } + streamMetaWUnLock(pMeta); + return code; } int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 0779eede9f..c63df059af 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -405,7 +405,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { EStreamTaskEvent evt = pSM->pActiveTrans->event; streamMutexUnlock(&pTask->lock); - stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", + stDebug("s-task:%s status:%s handling event:%s by another thread, wait for 100ms and check if completed", pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); taosMsleep(100); } else { @@ -418,6 +418,13 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { } if (pSM->pActiveTrans != NULL) { + // not allowed concurrently initialization + if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) { + stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr); + code = TSDB_CODE_STREAM_INVALID_STATETRANS; + break; + } + // currently in some state transfer procedure, not auto invoke transfer, abort it stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,