refactor(stream): limit the maximum number of consensus checkpoint trans.

This commit is contained in:
Haojun Liao 2025-02-11 10:31:11 +08:00
parent d08a81cc98
commit 267f7d3b08
2 changed files with 73 additions and 43 deletions

View File

@ -2587,100 +2587,10 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
}
}
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
int64_t now = taosGetTimestampMs();
SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
if (pStreamList == NULL) {
return terrno;
}
static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) {
int32_t alreadySend = taosArrayGetSize(pList);
mDebug("start to process consensus-checkpointId in tmr");
bool allReady = true;
SArray *pNodeSnapshot = NULL;
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
taosArrayDestroy(pNodeSnapshot);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
}
if (!allReady) {
mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
taosArrayDestroy(pStreamList);
return 0;
}
streamMutexLock(&execInfo.lock);
void *pIter = NULL;
while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
int64_t streamId = -1;
int32_t num = taosArrayGetSize(pInfo->pTaskList);
SArray *pList = taosArrayInit(4, sizeof(int32_t));
if (pList == NULL) {
continue;
}
SStreamObj *pStream = NULL;
code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
if (pStream == NULL || code != 0) { // stream has been dropped already
mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
void *p = taosArrayPush(pStreamList, &pInfo->streamId);
taosArrayDestroy(pList);
continue;
}
for (int32_t j = 0; j < num; ++j) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
if (pe == NULL) {
continue;
}
streamId = pe->req.streamId;
int32_t existed = 0;
bool allSame = true;
int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
if (chkId == -1) {
mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
pInfo->numOfTasks, pe->req.taskId);
break;
}
if (((now - pe->ts) >= 10 * 1000) || allSame) {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
pe->req.startTs, (now - pe->ts) / 1000.0);
if (chkId > pe->req.checkpointId) {
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList);
mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
pe->req.checkpointId, chkId);
return TSDB_CODE_FAILED;
}
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
}
void *p = taosArrayPush(pList, &pe->req.taskId);
if (p == NULL) {
mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
}
streamId = pe->req.streamId;
} else {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
pe->req.startTs, (now - pe->ts) / 1000.0);
}
}
mndReleaseStream(pMnode, pStream);
if (taosArrayGetSize(pList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
for (int32_t i = 0; i < alreadySend; ++i) {
int32_t *taskId = taosArrayGet(pList, i);
if (taskId == NULL) {
continue;
@ -2694,23 +2604,140 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
}
}
}
return alreadySend;
}
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
int64_t now = taosGetTimestampMs();
bool allReady = true;
SArray *pNodeSnapshot = NULL;
int32_t maxAllowedTrans = 50;
int32_t numOfTrans = 0;
int32_t code = 0;
void *pIter = NULL;
SArray *pList = taosArrayInit(4, sizeof(int32_t));
if (pList == NULL) {
return terrno;
}
SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
if (pStreamList == NULL) {
taosArrayDestroy(pList);
return terrno;
}
mDebug("start to process consensus-checkpointId in tmr");
code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
taosArrayDestroy(pNodeSnapshot);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
}
if (!allReady) {
mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
taosArrayDestroy(pStreamList);
taosArrayDestroy(pList);
return 0;
}
streamMutexLock(&execInfo.lock);
while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
taosArrayClear(pList);
int64_t streamId = -1;
int32_t num = taosArrayGetSize(pInfo->pTaskList);
SStreamObj *pStream = NULL;
code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
if (pStream == NULL || code != 0) { // stream has been dropped already
mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
void *p = taosArrayPush(pStreamList, &pInfo->streamId);
if (p == NULL) {
mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
" code:%s, continue",
pInfo->streamId, tstrerror(terrno));
}
continue;
}
for (int32_t j = 0; j < num; ++j) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
if (pe == NULL) {
continue;
}
if (streamId == -1) {
streamId = pe->req.streamId;
}
int32_t existed = 0;
bool allSame = true;
int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
if (chkId == -1) {
mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
pInfo->numOfTasks, pe->req.taskId);
break;
}
if (((now - pe->ts) >= 10 * 1000) && allSame) {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
pe->req.startTs, (now - pe->ts) / 1000.0);
if (chkId > pe->req.checkpointId) {
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList);
mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
pe->req.checkpointId, chkId);
mndReleaseStream(pMnode, pStream);
taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
return TSDB_CODE_FAILED;
}
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
}
void *p = taosArrayPush(pList, &pe->req.taskId);
if (p == NULL) {
mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
}
} else {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
pe->req.startTs, (now - pe->ts) / 1000.0);
}
}
mndReleaseStream(pMnode, pStream);
int32_t alreadySend = doCleanReqList(pList, pInfo);
// clear request stream item with empty task list
if (taosArrayGetSize(pInfo->pTaskList) == 0) {
mndClearConsensusRspEntry(pInfo);
if (streamId == -1) {
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList);
mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
return TSDB_CODE_FAILED;
mError("streamId is -1, streamId:%" PRIx64" in consensus-checkpointId hashMap, cont", pInfo->streamId);
}
void *p = taosArrayPush(pStreamList, &streamId);
if (p == NULL) {
mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId);
}
}
numOfTrans += alreadySend;
if (numOfTrans > maxAllowedTrans) {
mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend);
taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
break;
}
}
for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
@ -2725,7 +2752,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList);
mDebug("end to process consensus-checkpointId in tmr");
taosArrayDestroy(pList);
mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
return code;
}

View File

@ -815,16 +815,17 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
int64_t ts) {
char msg[128] = {0};
STrans *pTrans = NULL;
SStreamTask *pTask = NULL;
snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
STrans *pTrans = NULL;
int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
if (pTrans == NULL || code != 0) {
return terrno;
}
STaskId id = {.streamId = pStream->uid, .taskId = taskId};
SStreamTask *pTask = NULL;
code = mndGetStreamTask(&id, pStream, &pTask);
if (code) {
mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);