fix(stream): use hb to send the consens-checkpointid req.

This commit is contained in:
Haojun Liao 2024-07-08 09:05:33 +08:00
parent 6a1555e893
commit ce4153b6fc
11 changed files with 76 additions and 22 deletions

View File

@ -289,6 +289,7 @@ typedef struct SStreamStatus {
bool appendTranstateBlock; // has append the transfer state data block already
bool removeBackendFiles; // remove backend files on disk when free stream tasks
bool sendConsensusChkptId;
bool requireConsensusChkptId;
} SStreamStatus;
typedef struct SDataRange {
@ -568,14 +569,15 @@ typedef struct {
} SStreamScanHistoryReq;
typedef struct STaskCkptInfo {
int64_t latestId; // saved checkpoint id
int64_t latestVer; // saved checkpoint ver
int64_t latestTime; // latest checkpoint time
int64_t latestSize; // latest checkpoint size
int8_t remoteBackup; // latest checkpoint backup done
int64_t activeId; // current active checkpoint id
int32_t activeTransId; // checkpoint trans id
int8_t failed; // denote if the checkpoint is failed or not
int64_t latestId; // saved checkpoint id
int64_t latestVer; // saved checkpoint ver
int64_t latestTime; // latest checkpoint time
int64_t latestSize; // latest checkpoint size
int8_t remoteBackup; // latest checkpoint backup done
int64_t activeId; // current active checkpoint id
int32_t activeTransId; // checkpoint trans id
int8_t failed; // denote if the checkpoint is failed or not
int8_t consensusChkptId; // required the consensus-checkpointId
} STaskCkptInfo;
typedef struct STaskStatusEntry {

View File

@ -147,8 +147,7 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo,
SRpcHandleInfo *pRpcInfo);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo);
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);

View File

@ -2718,7 +2718,7 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", req.taskId, num,
numOfTasks);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
mndAddConsensusTasks(pInfo, &req, &pMsg->info);
mndAddConsensusTasks(pInfo, &req);
taosThreadMutexUnlock(&execInfo.lock);
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
@ -2737,7 +2737,7 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
// wait for 5s and check again
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
mndAddConsensusTasks(pInfo, &req, &pMsg->info);
mndAddConsensusTasks(pInfo, &req);
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
@ -2789,7 +2789,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
taosArrayPush(pList, &pe->req.taskId);
streamId = pe->req.streamId;
} else {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %2.fs already, wait for next round to check", pe->req.taskId,
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
(now - pe->ts) / 1000.0, pe->ts);
}
}

View File

@ -246,7 +246,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
tDecoderClear(&decoder);
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId);
mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId);
pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
@ -284,6 +284,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
continue;
}
STaskCkptInfo *pChkInfo = &p->checkpointInfo;
if (pChkInfo->consensusChkptId != 0) {
SRestoreCheckpointInfo cp = {
.streamId = p->id.streamId,
.taskId = p->id.taskId,
.checkpointId = p->checkpointInfo.latestId,
.startTs = pTaskEntry->startTime,
};
SStreamObj *pStream = mndGetStreamObj(pMnode, p->id.streamId);
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks);
mndAddConsensusTasks(pInfo, &cp);
mndReleaseStream(pMnode, pStream);
}
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
if (pTaskEntry->nodeId == SNODE_HANDLE) {
@ -292,7 +309,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} else {
streamTaskStatusCopy(pTaskEntry, p);
STaskCkptInfo *pChkInfo = &p->checkpointInfo;
if ((pChkInfo->activeId != 0) && pChkInfo->failed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
pChkInfo->activeId, pChkInfo->activeTransId);
@ -304,6 +320,21 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// remove failed trans from pChkptStreams
taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId));
}
/* if (pChkInfo->consensusChkptId != 0) {
SRestoreCheckpointInfo cp = {
.streamId = p->id.streamId,
.taskId = p->id.taskId,
.checkpointId = p->checkpointInfo.latestId,
.startTs = pTaskEntry->startTime,
};
SStreamObj* pStream = mndGetStreamObj(pMnode, p->id.streamId);
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks);
mndAddConsensusTasks(pInfo, &cp, NULL);
mndReleaseStream(pMnode, pStream);
}*/
}
if (p->status == pTaskEntry->status) {

View File

@ -953,12 +953,21 @@ SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId,
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
// discard the msg may lead to the lost of connections.
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcHandleInfo* pRpcInfo) {
SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs(), .rspInfo = *pRpcInfo};
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) {
SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
memcpy(&info.req, pRestoreInfo, sizeof(info.req));
taosArrayPush(pInfo->pTaskList, &info);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
if (p->req.taskId == info.req.taskId) {
mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64
", ignore this, total existed:%d",
pRestoreInfo->taskId, pRestoreInfo->streamId, (int32_t)taosArrayGetSize(pInfo->pTaskList));
return;
}
}
taosArrayPush(pInfo->pTaskList, &info);
int32_t num = taosArrayGetSize(pInfo->pTaskList);
mDebug("s-task:0x%x added into consensus-checkpointId list, stream:0x%" PRIx64 " total waiting:%d",
pRestoreInfo->taskId, pRestoreInfo->streamId, num);

View File

@ -742,6 +742,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
streamMetaStartAllTasks(pMeta);
} else {
streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
pMeta->startInfo.restartCount = 0;
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
}
@ -1160,7 +1161,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return TSDB_CODE_SUCCESS;
}
#if 0
// discard the rsp, since it is expired.
if (req.startTs < pTask->execInfo.created) {
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
@ -1170,7 +1171,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
#endif
tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);

View File

@ -1121,7 +1121,8 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
taosThreadMutexUnlock(&pTask->lock);
ASSERT(pTask->pBackend == NULL);
pTask->status.requireConsensusChkptId = true;
#if 0
SRestoreCheckpointInfo req = {
.streamId = pTask->id.streamId,
.taskId = pTask->id.taskId,
@ -1158,6 +1159,7 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
pInfo->checkpointId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
#endif
return 0;
}

View File

@ -194,6 +194,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
}
}
if ((*pTask)->status.requireConsensusChkptId) {
entry.checkpointInfo.consensusChkptId = 1;
(*pTask)->status.requireConsensusChkptId = false;
stDebug("s-task:%s vgId:%d set the require consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId);
}
if ((*pTask)->exec.pWalReader != NULL) {
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
if (entry.processedVer < 0) {

View File

@ -1031,10 +1031,11 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0;
pStartInfo->elapsedTime = 0;
// reset the sentinel flag value to be 0
pStartInfo->startAllTasks = 0;
stDebug("vgId:%d clear all start-all-task info", vgId);
stDebug("vgId:%d clear start-all-task info", vgId);
}
void streamMetaRLock(SStreamMeta* pMeta) {

View File

@ -349,6 +349,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestSize) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
@ -403,6 +404,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;

View File

@ -848,6 +848,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
.checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
.checkpointInfo.latestSize = 0,
.checkpointInfo.remoteBackup = 0,
.checkpointInfo.consensusChkptId = 0,
.hTaskId = pTask->hTaskInfo.id.taskId,
.procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
.outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),