diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 02e6a49bb6..4591c7fbcc 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -969,7 +969,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) #define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104) #define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105) -#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4106) +#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106) +#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1a0b1e8665..2c5d389a74 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -586,7 +586,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { // not in restore status, must be in checkpoint status - if (pStatus.state == TASK_STATUS__CK) { + if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) { stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, @@ -610,7 +610,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } // update only it is in checkpoint status, or during restore procedure. - if (pStatus.state == TASK_STATUS__CK || (!restored)) { + if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) { pInfo->checkpointId = pReq->checkpointId; pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 92aad2ece1..2be0782f43 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -385,11 +385,11 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (code == TSDB_CODE_SUCCESS) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, + stError("s-task:%s vgId:%d failed to handle event:init-task, code:%s", pTask->id.idStr, pMeta->vgId, tstrerror(code)); // do no added into result hashmap if it is failed due to concurrently starting of this stream task. - if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 46fb7a521d..cbaccff01d 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -413,7 +413,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { // no active event trans exists, handle this event directly pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { - stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); + stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), + pSM->current.name); streamMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } @@ -423,7 +424,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) { streamMutexUnlock(&pTask->lock); stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr); - return TSDB_CODE_STREAM_INVALID_STATETRANS; + return TSDB_CODE_STREAM_CONFLICT_EVENT; } // currently in some state transfer procedure, not auto invoke transfer, abort it diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6701842ec9..58dde5cd23 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -812,7 +812,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exi TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed") -TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode") // TDLite