fix(tsdb): 1. repeat send req consensus-checkpoint-id; 2.add more error code to deal with the init event failure,

This commit is contained in:
Haojun Liao 2024-08-29 11:14:58 +08:00
parent 1fa94a4827
commit 40669f4e9c
8 changed files with 76 additions and 42 deletions

View File

@ -272,24 +272,35 @@ typedef struct SCheckpointInfo {
int64_t processedVer; int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t msgVer; int64_t msgVer;
int32_t consensusTransId; // consensus checkpoint id
SActiveCheckpointInfo* pActiveInfo; SActiveCheckpointInfo* pActiveInfo;
} SCheckpointInfo; } SCheckpointInfo;
typedef enum {
TASK_CONSEN_CHKPT_REQ = 0x1,
TASK_CONSEN_CHKPT_SEND = 0x2,
TASK_CONSEN_CHKPT_RECV = 0x3,
} EConsenChkptStatus;
typedef struct SConsenChkptInfo {
// bool alreadySendChkptId;
EConsenChkptStatus status;
int64_t statusTs;
int32_t consenChkptTransId;
} SConsenChkptInfo;
typedef struct SStreamStatus { typedef struct SStreamStatus {
SStreamTaskSM* pSM; SStreamTaskSM* pSM;
int8_t taskStatus; int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus; int8_t schedStatus;
int8_t statusBackup; int8_t statusBackup;
int32_t schedIdleTime; // idle time before invoke again int32_t schedIdleTime; // idle time before invoke again
int32_t timerActive; // timer is active int32_t timerActive; // timer is active
int64_t lastExecTs; // last exec time stamp int64_t lastExecTs; // last exec time stamp
int32_t inScanHistorySentinel; int32_t inScanHistorySentinel;
bool appendTranstateBlock; // has append the transfer state data block already bool appendTranstateBlock; // has append the transfer state data block already
bool removeBackendFiles; // remove backend files on disk when free stream tasks bool removeBackendFiles; // remove backend files on disk when free stream tasks
bool sendConsensusChkptId; SConsenChkptInfo consenChkptInfo;
bool requireConsensusChkptId;
} SStreamStatus; } SStreamStatus;
typedef struct SDataRange { typedef struct SDataRange {
@ -774,6 +785,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta); bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts);
// timer // timer
int32_t streamTimerGetInstance(tmr_h* pTmr); int32_t streamTimerGetInstance(tmr_h* pTmr);

View File

@ -242,7 +242,6 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);

View File

@ -1291,6 +1291,3 @@ int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg); return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
} }
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessConsensusChkptRsp2(pTq->pStreamMeta, pMsg);
}

View File

@ -1238,9 +1238,9 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
if (pTask->chkInfo.consensusTransId >= req.transId) { if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) {
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId, tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
pTask->chkInfo.consensusTransId, req.transId); pTask->status.consenChkptInfo.consenChkptTransId, req.transId);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1256,7 +1256,9 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
pTask->id.idStr, vgId, req.checkpointId, req.transId); pTask->id.idStr, vgId, req.checkpointId, req.transId);
} }
pTask->chkInfo.consensusTransId = req.transId; pTask->status.consenChkptInfo.consenChkptTransId = req.transId;
pTask->status.consenChkptInfo.status = TASK_CONSEN_CHKPT_RECV;
pTask->status.consenChkptInfo.statusTs = taosGetTimestampMs();
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (pMeta->role == NODE_ROLE_LEADER) { if (pMeta->role == NODE_ROLE_LEADER) {

View File

@ -1371,19 +1371,19 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
} }
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
ETaskStatus p = streamTaskGetStatus(pTask).state; ETaskStatus p = streamTaskGetStatus(pTask).state;
// if (pInfo->alreadySendChkptId == true) {
if (pTask->status.sendConsensusChkptId == true) { // stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); // streamMutexUnlock(&pTask->lock);
streamMutexUnlock(&pTask->lock); // return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS; // } else {
} else { // pInfo->alreadySendChkptId = true;
pTask->status.sendConsensusChkptId = true; // }
} //
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (pTask->pBackend != NULL) { if (pTask->pBackend != NULL) {
@ -1391,8 +1391,9 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
pTask->pBackend = NULL; pTask->pBackend = NULL;
} }
pTask->status.requireConsensusChkptId = true; pInfo->status = TASK_CONSEN_CHKPT_REQ;
stDebug("s-task:%s set the require consensus-checkpointId flag", id); pInfo->statusTs = taosGetTimestampMs();
stDebug("s-task:%s set the require consensus-checkpointId flag, ts:%" PRId64, id, pInfo->statusTs);
return 0; return 0;
} }

View File

@ -197,10 +197,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
} }
} }
if ((*pTask)->status.requireConsensusChkptId) { entry.checkpointInfo.consensusChkptId = streamTaskSetReqConsensusChkptId(*pTask, pMsg->ts);
entry.checkpointInfo.consensusChkptId = 1; if (entry.checkpointInfo.consensusChkptId) {
(*pTask)->status.requireConsensusChkptId = false; entry.checkpointInfo.consensusTs = pMsg->ts;
stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId);
} }
if ((*pTask)->exec.pWalReader != NULL) { if ((*pTask)->exec.pWalReader != NULL) {

View File

@ -120,7 +120,8 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret; code = ret;
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
} }
} }
@ -195,9 +196,9 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
} }
// clear the send consensus-checkpointId flag // clear the send consensus-checkpointId flag
streamMutexLock(&(*p)->lock); // streamMutexLock(&(*p)->lock);
(*p)->status.sendConsensusChkptId = false; // (*p)->status.sendConsensusChkptId = false;
streamMutexUnlock(&(*p)->lock); // streamMutexUnlock(&(*p)->lock);
if (pStartInfo->startAllTasks != 1) { if (pStartInfo->startAllTasks != 1) {
int64_t el = endTs - startTs; int64_t el = endTs - startTs;
@ -443,4 +444,27 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
return 0; return 0;
} }
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
int32_t vgId = pTask->pMeta->vgId;
if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
pConChkptInfo->statusTs = ts;
stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr,
vgId, pConChkptInfo->statusTs);
return 1;
} else {
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && (ts - pConChkptInfo->statusTs) > 60 * 1000) {
pConChkptInfo->statusTs = ts;
stWarn("s-task:%s vgId:%d not recv consensus-chkptId for 60s, set requiring in Hb again, ts:%" PRId64,
pTask->id.idStr, vgId, pConChkptInfo->statusTs);
return 1;
}
}
return 0;
}

View File

@ -941,7 +941,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
.checkpointInfo.latestSize = 0, .checkpointInfo.latestSize = 0,
.checkpointInfo.remoteBackup = 0, .checkpointInfo.remoteBackup = 0,
.checkpointInfo.consensusChkptId = 0, .checkpointInfo.consensusChkptId = 0,
.checkpointInfo.consensusTs = taosGetTimestampMs(), .checkpointInfo.consensusTs = 0,
.hTaskId = pTask->hTaskInfo.id.taskId, .hTaskId = pTask->hTaskInfo.id.taskId,
.procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize), .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
.outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize), .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),