From 1fa94a48276629764ef18132d1ae7072f9bc9adb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 28 Aug 2024 19:20:48 +0800 Subject: [PATCH 1/2] fix(stream): enable follower update the checkpoint info. --- include/util/taoserror.h | 3 ++- source/libs/stream/src/streamCheckpoint.c | 4 ++-- source/libs/stream/src/streamStartTask.c | 4 ++-- source/libs/stream/src/streamTaskSm.c | 5 +++-- source/util/src/terror.c | 3 ++- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 02e6a49bb6..4591c7fbcc 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -969,7 +969,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) #define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104) #define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105) -#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4106) +#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106) +#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1a0b1e8665..2c5d389a74 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -586,7 +586,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { // not in restore status, must be in checkpoint status - if (pStatus.state == TASK_STATUS__CK) { + if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) { stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, @@ -610,7 +610,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } // update only it is in checkpoint status, or during restore procedure. - if (pStatus.state == TASK_STATUS__CK || (!restored)) { + if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) { pInfo->checkpointId = pReq->checkpointId; pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 92aad2ece1..2be0782f43 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -385,11 +385,11 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (code == TSDB_CODE_SUCCESS) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, + stError("s-task:%s vgId:%d failed to handle event:init-task, code:%s", pTask->id.idStr, pMeta->vgId, tstrerror(code)); // do no added into result hashmap if it is failed due to concurrently starting of this stream task. - if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 46fb7a521d..cbaccff01d 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -413,7 +413,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { // no active event trans exists, handle this event directly pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { - stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); + stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), + pSM->current.name); streamMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } @@ -423,7 +424,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) { streamMutexUnlock(&pTask->lock); stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr); - return TSDB_CODE_STREAM_INVALID_STATETRANS; + return TSDB_CODE_STREAM_CONFLICT_EVENT; } // currently in some state transfer procedure, not auto invoke transfer, abort it diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6701842ec9..58dde5cd23 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -812,7 +812,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exi TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed") -TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode") // TDLite From 40669f4e9c86e0e8ca4d8d4eaab8ce88c91d46c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 29 Aug 2024 11:14:58 +0800 Subject: [PATCH 2/2] fix(tsdb): 1. repeat send req consensus-checkpoint-id; 2.add more error code to deal with the init event failure, --- include/libs/stream/tstream.h | 40 ++++++++++++++-------- source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/tq/tq.c | 3 -- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 +++-- source/libs/stream/src/streamCheckpoint.c | 25 +++++++------- source/libs/stream/src/streamHb.c | 7 ++-- source/libs/stream/src/streamStartTask.c | 32 ++++++++++++++--- source/libs/stream/src/streamTask.c | 2 +- 8 files changed, 76 insertions(+), 42 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b0b5a64a50..818d8cfdd4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -272,24 +272,35 @@ typedef struct SCheckpointInfo { int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t msgVer; - int32_t consensusTransId; // consensus checkpoint id SActiveCheckpointInfo* pActiveInfo; } SCheckpointInfo; +typedef enum { + TASK_CONSEN_CHKPT_REQ = 0x1, + TASK_CONSEN_CHKPT_SEND = 0x2, + TASK_CONSEN_CHKPT_RECV = 0x3, +} EConsenChkptStatus; + +typedef struct SConsenChkptInfo { +// bool alreadySendChkptId; + EConsenChkptStatus status; + int64_t statusTs; + int32_t consenChkptTransId; +} SConsenChkptInfo; + typedef struct SStreamStatus { - SStreamTaskSM* pSM; - int8_t taskStatus; - int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set - int8_t schedStatus; - int8_t statusBackup; - int32_t schedIdleTime; // idle time before invoke again - int32_t timerActive; // timer is active - int64_t lastExecTs; // last exec time stamp - int32_t inScanHistorySentinel; - bool appendTranstateBlock; // has append the transfer state data block already - bool removeBackendFiles; // remove backend files on disk when free stream tasks - bool sendConsensusChkptId; - bool requireConsensusChkptId; + SStreamTaskSM* pSM; + int8_t taskStatus; + int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set + int8_t schedStatus; + int8_t statusBackup; + int32_t schedIdleTime; // idle time before invoke again + int32_t timerActive; // timer is active + int64_t lastExecTs; // last exec time stamp + int32_t inScanHistorySentinel; + bool appendTranstateBlock; // has append the transfer state data block already + bool removeBackendFiles; // remove backend files on disk when free stream tasks + SConsenChkptInfo consenChkptInfo; } SStreamStatus; typedef struct SDataRange { @@ -774,6 +785,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); +int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts); // timer int32_t streamTimerGetInstance(tmr_h* pTmr); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 820abcaea6..07490989f5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -242,7 +242,6 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b280d62b3a..fae786ba19 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1291,6 +1291,3 @@ int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg); } -int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamProcessConsensusChkptRsp2(pTq->pStreamMeta, pMsg); -} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a63c15edfb..cad2ca3eb0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1238,9 +1238,9 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMutexLock(&pTask->lock); ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); - if (pTask->chkInfo.consensusTransId >= req.transId) { + if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) { tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId, - pTask->chkInfo.consensusTransId, req.transId); + pTask->status.consenChkptInfo.consenChkptTransId, req.transId); streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; @@ -1256,7 +1256,9 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { pTask->id.idStr, vgId, req.checkpointId, req.transId); } - pTask->chkInfo.consensusTransId = req.transId; + pTask->status.consenChkptInfo.consenChkptTransId = req.transId; + pTask->status.consenChkptInfo.status = TASK_CONSEN_CHKPT_RECV; + pTask->status.consenChkptInfo.statusTs = taosGetTimestampMs(); streamMutexUnlock(&pTask->lock); if (pMeta->role == NODE_ROLE_LEADER) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2c5d389a74..565a3e35e7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1371,19 +1371,19 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { } int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { - const char* id = pTask->id.idStr; + const char* id = pTask->id.idStr; + SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo; streamMutexLock(&pTask->lock); ETaskStatus p = streamTaskGetStatus(pTask).state; - - if (pTask->status.sendConsensusChkptId == true) { - stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); - streamMutexUnlock(&pTask->lock); - return TSDB_CODE_SUCCESS; - } else { - pTask->status.sendConsensusChkptId = true; - } - +// if (pInfo->alreadySendChkptId == true) { +// stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); +// streamMutexUnlock(&pTask->lock); +// return TSDB_CODE_SUCCESS; +// } else { +// pInfo->alreadySendChkptId = true; +// } +// streamMutexUnlock(&pTask->lock); if (pTask->pBackend != NULL) { @@ -1391,8 +1391,9 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { pTask->pBackend = NULL; } - pTask->status.requireConsensusChkptId = true; - stDebug("s-task:%s set the require consensus-checkpointId flag", id); + pInfo->status = TASK_CONSEN_CHKPT_REQ; + pInfo->statusTs = taosGetTimestampMs(); + stDebug("s-task:%s set the require consensus-checkpointId flag, ts:%" PRId64, id, pInfo->statusTs); return 0; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 73392fade0..ec65c274cf 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -197,10 +197,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } } - if ((*pTask)->status.requireConsensusChkptId) { - entry.checkpointInfo.consensusChkptId = 1; - (*pTask)->status.requireConsensusChkptId = false; - stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); + entry.checkpointInfo.consensusChkptId = streamTaskSetReqConsensusChkptId(*pTask, pMsg->ts); + if (entry.checkpointInfo.consensusChkptId) { + entry.checkpointInfo.consensusTs = pMsg->ts; } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 2be0782f43..90987e3fba 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -120,7 +120,8 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + // do no added into result hashmap if it is failed due to concurrently starting of this stream task. + if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } } @@ -195,9 +196,9 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 } // clear the send consensus-checkpointId flag - streamMutexLock(&(*p)->lock); - (*p)->status.sendConsensusChkptId = false; - streamMutexUnlock(&(*p)->lock); +// streamMutexLock(&(*p)->lock); +// (*p)->status.sendConsensusChkptId = false; +// streamMutexUnlock(&(*p)->lock); if (pStartInfo->startAllTasks != 1) { int64_t el = endTs - startTs; @@ -443,4 +444,27 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { return 0; } +int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) { + SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo; + + int32_t vgId = pTask->pMeta->vgId; + if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) { + pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND; + pConChkptInfo->statusTs = ts; + stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr, + vgId, pConChkptInfo->statusTs); + return 1; + } else { + if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && (ts - pConChkptInfo->statusTs) > 60 * 1000) { + pConChkptInfo->statusTs = ts; + + stWarn("s-task:%s vgId:%d not recv consensus-chkptId for 60s, set requiring in Hb again, ts:%" PRId64, + pTask->id.idStr, vgId, pConChkptInfo->statusTs); + return 1; + } + } + + return 0; +} + diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a184314714..14a299b5ce 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -941,7 +941,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) { .checkpointInfo.latestSize = 0, .checkpointInfo.remoteBackup = 0, .checkpointInfo.consensusChkptId = 0, - .checkpointInfo.consensusTs = taosGetTimestampMs(), + .checkpointInfo.consensusTs = 0, .hTaskId = pTask->hTaskInfo.id.taskId, .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize), .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),