fix(stream): enable follower update the checkpoint info.

This commit is contained in:
Haojun Liao 2024-08-28 19:20:48 +08:00
parent c61a96656e
commit 1fa94a4827
5 changed files with 11 additions and 8 deletions

View File

@ -969,7 +969,8 @@ int32_t taosGetErrSize();
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105)
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4106)
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -586,7 +586,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs);
} else { // not in restore status, must be in checkpoint status
if (pStatus.state == TASK_STATUS__CK) {
if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
@ -610,7 +610,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
}
// update only it is in checkpoint status, or during restore procedure.
if (pStatus.state == TASK_STATUS__CK || (!restored)) {
if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
pInfo->checkpointId = pReq->checkpointId;
pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs;

View File

@ -385,11 +385,11 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
stError("s-task:%s vgId:%d failed to handle event:init-task, code:%s", pTask->id.idStr, pMeta->vgId,
tstrerror(code));
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
}

View File

@ -413,7 +413,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
// no active event trans exists, handle this event directly
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event),
pSM->current.name);
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
@ -423,7 +424,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) {
streamMutexUnlock(&pTask->lock);
stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
return TSDB_CODE_STREAM_CONFLICT_EVENT;
}
// currently in some state transfer procedure, not auto invoke transfer, abort it

View File

@ -812,7 +812,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exi
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
// TDLite