diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b0b5a64a50..818d8cfdd4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -272,24 +272,35 @@ typedef struct SCheckpointInfo { int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t msgVer; - int32_t consensusTransId; // consensus checkpoint id SActiveCheckpointInfo* pActiveInfo; } SCheckpointInfo; +typedef enum { + TASK_CONSEN_CHKPT_REQ = 0x1, + TASK_CONSEN_CHKPT_SEND = 0x2, + TASK_CONSEN_CHKPT_RECV = 0x3, +} EConsenChkptStatus; + +typedef struct SConsenChkptInfo { +// bool alreadySendChkptId; + EConsenChkptStatus status; + int64_t statusTs; + int32_t consenChkptTransId; +} SConsenChkptInfo; + typedef struct SStreamStatus { - SStreamTaskSM* pSM; - int8_t taskStatus; - int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set - int8_t schedStatus; - int8_t statusBackup; - int32_t schedIdleTime; // idle time before invoke again - int32_t timerActive; // timer is active - int64_t lastExecTs; // last exec time stamp - int32_t inScanHistorySentinel; - bool appendTranstateBlock; // has append the transfer state data block already - bool removeBackendFiles; // remove backend files on disk when free stream tasks - bool sendConsensusChkptId; - bool requireConsensusChkptId; + SStreamTaskSM* pSM; + int8_t taskStatus; + int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set + int8_t schedStatus; + int8_t statusBackup; + int32_t schedIdleTime; // idle time before invoke again + int32_t timerActive; // timer is active + int64_t lastExecTs; // last exec time stamp + int32_t inScanHistorySentinel; + bool appendTranstateBlock; // has append the transfer state data block already + bool removeBackendFiles; // remove backend files on disk when free stream tasks + SConsenChkptInfo consenChkptInfo; } SStreamStatus; typedef struct SDataRange { @@ -774,6 +785,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); +int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts); // timer int32_t streamTimerGetInstance(tmr_h* pTmr); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 820abcaea6..07490989f5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -242,7 +242,6 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b280d62b3a..fae786ba19 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1291,6 +1291,3 @@ int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg); } -int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamProcessConsensusChkptRsp2(pTq->pStreamMeta, pMsg); -} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a63c15edfb..cad2ca3eb0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1238,9 +1238,9 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMutexLock(&pTask->lock); ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); - if (pTask->chkInfo.consensusTransId >= req.transId) { + if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) { tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId, - pTask->chkInfo.consensusTransId, req.transId); + pTask->status.consenChkptInfo.consenChkptTransId, req.transId); streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; @@ -1256,7 +1256,9 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { pTask->id.idStr, vgId, req.checkpointId, req.transId); } - pTask->chkInfo.consensusTransId = req.transId; + pTask->status.consenChkptInfo.consenChkptTransId = req.transId; + pTask->status.consenChkptInfo.status = TASK_CONSEN_CHKPT_RECV; + pTask->status.consenChkptInfo.statusTs = taosGetTimestampMs(); streamMutexUnlock(&pTask->lock); if (pMeta->role == NODE_ROLE_LEADER) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2c5d389a74..565a3e35e7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1371,19 +1371,19 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { } int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { - const char* id = pTask->id.idStr; + const char* id = pTask->id.idStr; + SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo; streamMutexLock(&pTask->lock); ETaskStatus p = streamTaskGetStatus(pTask).state; - - if (pTask->status.sendConsensusChkptId == true) { - stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); - streamMutexUnlock(&pTask->lock); - return TSDB_CODE_SUCCESS; - } else { - pTask->status.sendConsensusChkptId = true; - } - +// if (pInfo->alreadySendChkptId == true) { +// stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); +// streamMutexUnlock(&pTask->lock); +// return TSDB_CODE_SUCCESS; +// } else { +// pInfo->alreadySendChkptId = true; +// } +// streamMutexUnlock(&pTask->lock); if (pTask->pBackend != NULL) { @@ -1391,8 +1391,9 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { pTask->pBackend = NULL; } - pTask->status.requireConsensusChkptId = true; - stDebug("s-task:%s set the require consensus-checkpointId flag", id); + pInfo->status = TASK_CONSEN_CHKPT_REQ; + pInfo->statusTs = taosGetTimestampMs(); + stDebug("s-task:%s set the require consensus-checkpointId flag, ts:%" PRId64, id, pInfo->statusTs); return 0; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 73392fade0..ec65c274cf 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -197,10 +197,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } } - if ((*pTask)->status.requireConsensusChkptId) { - entry.checkpointInfo.consensusChkptId = 1; - (*pTask)->status.requireConsensusChkptId = false; - stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); + entry.checkpointInfo.consensusChkptId = streamTaskSetReqConsensusChkptId(*pTask, pMsg->ts); + if (entry.checkpointInfo.consensusChkptId) { + entry.checkpointInfo.consensusTs = pMsg->ts; } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 2be0782f43..90987e3fba 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -120,7 +120,8 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + // do no added into result hashmap if it is failed due to concurrently starting of this stream task. + if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } } @@ -195,9 +196,9 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 } // clear the send consensus-checkpointId flag - streamMutexLock(&(*p)->lock); - (*p)->status.sendConsensusChkptId = false; - streamMutexUnlock(&(*p)->lock); +// streamMutexLock(&(*p)->lock); +// (*p)->status.sendConsensusChkptId = false; +// streamMutexUnlock(&(*p)->lock); if (pStartInfo->startAllTasks != 1) { int64_t el = endTs - startTs; @@ -443,4 +444,27 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { return 0; } +int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) { + SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo; + + int32_t vgId = pTask->pMeta->vgId; + if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) { + pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND; + pConChkptInfo->statusTs = ts; + stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr, + vgId, pConChkptInfo->statusTs); + return 1; + } else { + if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && (ts - pConChkptInfo->statusTs) > 60 * 1000) { + pConChkptInfo->statusTs = ts; + + stWarn("s-task:%s vgId:%d not recv consensus-chkptId for 60s, set requiring in Hb again, ts:%" PRId64, + pTask->id.idStr, vgId, pConChkptInfo->statusTs); + return 1; + } + } + + return 0; +} + diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a184314714..14a299b5ce 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -941,7 +941,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) { .checkpointInfo.latestSize = 0, .checkpointInfo.remoteBackup = 0, .checkpointInfo.consensusChkptId = 0, - .checkpointInfo.consensusTs = taosGetTimestampMs(), + .checkpointInfo.consensusTs = 0, .hTaskId = pTask->hTaskInfo.id.taskId, .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize), .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),