refactor(stream): kill too long checkpoint trans.

This commit is contained in:
Haojun Liao 2025-02-20 10:01:30 +08:00
parent 1c5e545337
commit 3048654484
3 changed files with 47 additions and 11 deletions

View File

@ -116,7 +116,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt);
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans);
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);

View File

@ -1258,16 +1258,48 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = NULL;
int32_t code = 0;
int32_t numOfCheckpointTrans = 0;
SArray *pLongChkpts = NULL;
SArray *pList = NULL;
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
pList = taosArrayInit(4, sizeof(SCheckpointInterval));
if (pList == NULL) {
mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno));
return terrno;
}
pLongChkpts = taosArrayInit(4, sizeof(int64_t));
if (pLongChkpts == NULL) {
mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno));
taosArrayDestroy(pList);
return terrno;
}
// check if ongong checkpoint trans or long chkpt trans exist.
code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts);
if (code) {
mError("failed to clear finish trans, code:%s", tstrerror(code));
return code;
}
// kill long exec checkpoint and set task status
if (taosArrayGetSize(pLongChkpts) > 0) {
//todo:
for(int32_t i = 0; i < taosArrayGetSize(pLongChkpts); ++i) {
mndKillTransImpl(pMnode, xx, "");
mndCreateStreamResetStatusTrans(pMnode, pStream, chkptId);
}
taosArrayDestroy(pList);
taosArrayDestroy(pLongChkpts);
return TSDB_CODE_SUCCESS;
}
int64_t now = taosGetTimestampMs();
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
@ -1304,12 +1336,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
}
taosArraySort(pList, streamWaitComparFn);
code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
if (code) {
mError("failed to clear finish trans, code:%s", tstrerror(code));
taosArrayDestroy(pList);
return code;
}
int32_t numOfQual = taosArrayGetSize(pList);
if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {

View File

@ -31,11 +31,12 @@ int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t s
return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
}
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) {
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans) {
size_t keyLen = 0;
void *pIter = NULL;
SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
int32_t numOfChkpt = 0;
int64_t now = taosGetTimestampMs();
if (pNumOfActiveChkpt != NULL) {
*pNumOfActiveChkpt = 0;
@ -63,6 +64,15 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
} else {
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
numOfChkpt++;
// last for 10min, kill it
int64_t dur = now - pTrans->createdTime;
if ((dur >= 600 * 1000) && pSlowChkptTrans != NULL) {
mInfo("long chkpt transId:%d, start:%" PRId64
" exec duration:%.2fs, beyond threshold 10min, kill it and reset task status",
pTrans->id, pTrans->createdTime, dur / 1000.0);
taosArrayPush(pSlowChkptTrans, &pEntry->transId);
}
}
mndReleaseTrans(pMnode, pTrans);
}
@ -101,7 +111,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons
}
// if any task updates exist, any other stream trans are not allowed to be created
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
if (code) {
mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
}
@ -160,7 +170,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
return 0;
}
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
if (code) {
mError("failed to clear finish trans, code:%s", tstrerror(code));
}