fix(stream): check the set consensus-checkpointId ts.

This commit is contained in:
Haojun Liao 2024-07-08 14:33:28 +08:00
parent 647f9f47ef
commit 6e15c16cf7
7 changed files with 135 additions and 130 deletions

View File

@ -578,6 +578,7 @@ typedef struct STaskCkptInfo {
int32_t activeTransId; // checkpoint trans id
int8_t failed; // denote if the checkpoint is failed or not
int8_t consensusChkptId; // required the consensus-checkpointId
int64_t consensusTs; //
} STaskCkptInfo;
typedef struct STaskStatusEntry {
@ -588,8 +589,6 @@ typedef struct STaskStatusEntry {
int32_t nodeId;
SVersionRange verRange; // start/end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB
double inputRate;
double procsThroughput; // duration between one element put into input queue and being processed.

View File

@ -59,7 +59,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg);
//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg);
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
@ -2617,10 +2617,11 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
return 0;
}
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks) {
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks, bool *pAllSame) {
int32_t num = 0;
int64_t chkId = INT64_MAX;
*pExistedTasks = 0;
*pAllSame = true;
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId* p = taosArrayGet(execInfo.pTaskList, i);
@ -2631,6 +2632,9 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pEx
num += 1;
STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (chkId > pe->checkpointInfo.latestId) {
if (chkId != INT64_MAX) {
*pAllSame = false;
}
chkId = pe->checkpointInfo.latestId;
}
}
@ -2653,99 +2657,99 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
pInfo->handle = NULL; // disable auto rsp
}
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SDecoder decoder = {0};
SRestoreCheckpointInfo req = {0};
tDecoderInit(&decoder, pMsg->pCont, pMsg->contLen);
if (tDecodeRestoreCheckpointInfo(&decoder, &req)) {
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
mError("invalid task consensus-checkpoint msg received");
return -1;
}
tDecoderClear(&decoder);
mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64,
req.nodeId, req.streamId, req.taskId, req.checkpointId);
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
taosThreadMutexLock(&execInfo.lock);
// mnode handle the create stream transaction too slow may cause this problem
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
if (pStream == NULL) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock);
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
return -1;
} else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
req.streamId, req.taskId);
// todo wait for stream is created
}
}
mInfo("vgId:%d stream:0x%" PRIx64 " %s meta-stored checkpointId:%" PRId64, req.nodeId, req.streamId, pStream->name,
pStream->checkpointId);
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly
taosThreadMutexUnlock(&execInfo.lock);
mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs);
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
return TSDB_CODE_SUCCESS;
}
int32_t num = 0;
int64_t chkId = getConsensusId(req.streamId, numOfTasks, &num);
// some tasks not send hbMsg to mnode yet, wait for 5s.
if (chkId == -1) {
mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", req.taskId, num,
numOfTasks);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
mndAddConsensusTasks(pInfo, &req);
taosThreadMutexUnlock(&execInfo.lock);
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
return 0;
}
if (chkId == req.checkpointId) {
mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64,
req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId);
mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs);
taosThreadMutexUnlock(&execInfo.lock);
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
return 0;
}
// wait for 5s and check again
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
mndAddConsensusTasks(pInfo, &req);
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
}
taosThreadMutexUnlock(&execInfo.lock);
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
return 0;
}
//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
// SMnode *pMnode = pMsg->info.node;
// SDecoder decoder = {0};
//
// SRestoreCheckpointInfo req = {0};
// tDecoderInit(&decoder, pMsg->pCont, pMsg->contLen);
//
// if (tDecodeRestoreCheckpointInfo(&decoder, &req)) {
// tDecoderClear(&decoder);
// terrno = TSDB_CODE_INVALID_MSG;
// mError("invalid task consensus-checkpoint msg received");
// return -1;
// }
// tDecoderClear(&decoder);
//
// mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64,
// req.nodeId, req.streamId, req.taskId, req.checkpointId);
//
// // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
// taosThreadMutexLock(&execInfo.lock);
//
// // mnode handle the create stream transaction too slow may cause this problem
// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
// if (pStream == NULL) {
// mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId);
//
// // not in meta-store yet, try to acquire the task in exec buffer
// // the checkpoint req arrives too soon before the completion of the create stream trans.
// STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
// void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
// if (p == NULL) {
// mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId);
// terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
// taosThreadMutexUnlock(&execInfo.lock);
//
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
// return -1;
// } else {
// mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
// req.streamId, req.taskId);
// // todo wait for stream is created
// }
// }
//
// mInfo("vgId:%d stream:0x%" PRIx64 " %s meta-stored checkpointId:%" PRId64, req.nodeId, req.streamId, pStream->name,
// pStream->checkpointId);
//
// int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
// if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly
// taosThreadMutexUnlock(&execInfo.lock);
// mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs);
//
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
// return TSDB_CODE_SUCCESS;
// }
//
// int32_t num = 0;
// int64_t chkId = getConsensusId(req.streamId, numOfTasks, &num);
//
// // some tasks not send hbMsg to mnode yet, wait for 5s.
// if (chkId == -1) {
// mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", req.taskId, num,
// numOfTasks);
// SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
// mndAddConsensusTasks(pInfo, &req);
//
// taosThreadMutexUnlock(&execInfo.lock);
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
// return 0;
// }
//
// if (chkId == req.checkpointId) {
// mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64,
// req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId);
// mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs);
//
// taosThreadMutexUnlock(&execInfo.lock);
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
// return 0;
// }
//
// // wait for 5s and check again
// SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
// mndAddConsensusTasks(pInfo, &req);
//
// if (pStream != NULL) {
// mndReleaseStream(pMnode, pStream);
// }
//
// taosThreadMutexUnlock(&execInfo.lock);
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
// return 0;
//}
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
@ -2753,6 +2757,15 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
mDebug("start to process consensus-checkpointId in tmr");
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(pNodeSnapshot);
if (!allReady) {
mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
return 0;
}
taosThreadMutexLock(&execInfo.lock);
void *pIter = NULL;
@ -2766,6 +2779,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
SStreamObj *pStream = mndGetStreamObj(pMnode, pInfo->streamId);
if (pStream == NULL) { // stream has been dropped already
mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
taosArrayDestroy(pList);
continue;
}
@ -2773,15 +2787,18 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
streamId = pe->req.streamId;
if ((now - pe->ts) >= 10 * 1000) {
int32_t existed = 0;
int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed);
bool allSame = true;
int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
if (chkId == -1) {
mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
pInfo->numOfTasks, pe->req.taskId);
break;
}
if (((now - pe->ts) >= 10 * 1000) || allSame) {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
(now - pe->ts) / 1000.0, pe->ts);
ASSERT(chkId <= pe->req.checkpointId);
mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);

View File

@ -290,7 +290,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
.streamId = p->id.streamId,
.taskId = p->id.taskId,
.checkpointId = p->checkpointInfo.latestId,
.startTs = pTaskEntry->startTime,
.startTs = pChkInfo->consensusTs,
};
SStreamObj *pStream = mndGetStreamObj(pMnode, p->id.streamId);
@ -320,21 +320,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// remove failed trans from pChkptStreams
taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId));
}
/* if (pChkInfo->consensusChkptId != 0) {
SRestoreCheckpointInfo cp = {
.streamId = p->id.streamId,
.taskId = p->id.taskId,
.checkpointId = p->checkpointInfo.latestId,
.startTs = pTaskEntry->startTime,
};
SStreamObj* pStream = mndGetStreamObj(pMnode, p->id.streamId);
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks);
mndAddConsensusTasks(pInfo, &cp, NULL);
mndReleaseStream(pMnode, pStream);
}*/
}
if (p->status == pTaskEntry->status) {

View File

@ -87,7 +87,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
int32_t replica = -1; // do the replica check
*allReady = true;
SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
taosArrayPush(pVgroupList, &entry);
sdbRelease(pSdb, pVgroup);
}
@ -152,11 +152,11 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
taosArrayPush(pVgroupList, &entry);
sdbRelease(pSdb, pObj);
}
return pVgroupListSnapshot;
return pVgroupList;
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
@ -960,9 +960,10 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
if (p->req.taskId == info.req.taskId) {
mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64
", ignore this, total existed:%d",
mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
"->%" PRId64 " total existed:%d",
pRestoreInfo->taskId, pRestoreInfo->streamId, (int32_t)taosArrayGetSize(pInfo->pTaskList));
p->req.startTs = info.req.startTs;
return;
}
}

View File

@ -1161,7 +1161,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return TSDB_CODE_SUCCESS;
}
#if 0
// discard the rsp, since it is expired.
if (req.startTs < pTask->execInfo.created) {
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
@ -1171,7 +1171,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
#endif
tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);

View File

@ -350,6 +350,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestSize) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
@ -405,6 +406,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;

View File

@ -849,6 +849,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
.checkpointInfo.latestSize = 0,
.checkpointInfo.remoteBackup = 0,
.checkpointInfo.consensusChkptId = 0,
.checkpointInfo.consensusTs = taosGetTimestampMs(),
.hTaskId = pTask->hTaskInfo.id.taskId,
.procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
.outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),