enh(stream): kill too long checkpoint trans.

This commit is contained in:
Haojun Liao 2025-02-20 15:07:24 +08:00
parent 3048654484
commit 99d6086c5a
3 changed files with 45 additions and 16 deletions

View File

@ -116,7 +116,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans);
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans);
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
SVgroupChangeInfo *pInfo);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
void killChkptAndResetStreamTask(SMnode *pMnode, SArray *pLongChkpts);
bool isNodeUpdateTransActive();
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter);

View File

@ -1260,6 +1260,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t numOfCheckpointTrans = 0;
SArray *pLongChkpts = NULL;
SArray *pList = NULL;
int64_t now = taosGetTimestampMs();
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
@ -1271,7 +1272,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
return terrno;
}
pLongChkpts = taosArrayInit(4, sizeof(int64_t));
pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo));
if (pLongChkpts == NULL) {
mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno));
taosArrayDestroy(pList);
@ -1282,26 +1283,21 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts);
if (code) {
mError("failed to clear finish trans, code:%s", tstrerror(code));
taosArrayDestroy(pList);
taosArrayDestroy(pLongChkpts);
return code;
}
// kill long exec checkpoint and set task status
if (taosArrayGetSize(pLongChkpts) > 0) {
//todo:
for(int32_t i = 0; i < taosArrayGetSize(pLongChkpts); ++i) {
mndKillTransImpl(pMnode, xx, "");
mndCreateStreamResetStatusTrans(pMnode, pStream, chkptId);
}
killChkptAndResetStreamTask(pMnode, pLongChkpts);
taosArrayDestroy(pList);
taosArrayDestroy(pLongChkpts);
return TSDB_CODE_SUCCESS;
}
int64_t now = taosGetTimestampMs();
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
int64_t duration = now - pStream->checkpointFreq;
if (duration < tsStreamCheckpointInterval * 1000) {

View File

@ -16,6 +16,8 @@
#include "mndStream.h"
#include "mndTrans.h"
#define MAX_CHKPT_EXEC_ELAPSED (60*1000) // 60s
typedef struct SKeyInfo {
void *pKey;
int32_t keyLen;
@ -31,7 +33,7 @@ int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t s
return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
}
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans) {
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans) {
size_t keyLen = 0;
void *pIter = NULL;
SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
@ -67,11 +69,11 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt,
// last for 10min, kill it
int64_t dur = now - pTrans->createdTime;
if ((dur >= 600 * 1000) && pSlowChkptTrans != NULL) {
if ((dur >= MAX_CHKPT_EXEC_ELAPSED) && (pLongChkptTrans != NULL)) {
mInfo("long chkpt transId:%d, start:%" PRId64
" exec duration:%.2fs, beyond threshold 10min, kill it and reset task status",
pTrans->id, pTrans->createdTime, dur / 1000.0);
taosArrayPush(pSlowChkptTrans, &pEntry->transId);
" exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status",
pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0));
taosArrayPush(pLongChkptTrans, pEntry);
}
}
mndReleaseTrans(pMnode, pTrans);
@ -371,3 +373,33 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("complete clear checkpoints in all Dbs");
}
void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) {
int32_t code = 0;
int64_t now = taosGetTimestampMs();
int32_t num = taosArrayGetSize(pLongChkpts);
mInfo("start to kill %d long checkpoint trans", num);
for(int32_t i = 0; i < num; ++i) {
SStreamTransInfo* pTrans = (SStreamTransInfo*) taosArrayGet(pLongChkpts, i);
if (pTrans == NULL) {
continue;
}
double el = (now - pTrans->startTime) / 1000.0;
mInfo("stream:%s id:%" PRIx64 " ongoing checkpoint trans, id:%d, elapsed time:%.2fs killed", pTrans->name,
pTrans->streamId, pTrans->transId, el);
SStreamObj *p = NULL;
code = mndGetStreamObj(pMnode, pTrans->streamId, &p);
if (code == 0 && p != NULL) {
mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb);
mDebug("create reset task trans for stream:%s 0x%" PRIx64, pTrans->name, pTrans->streamId);
mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId);
sdbRelease(pMnode->pSdb, p);
}
}
}